blob: e14096a86de5adfbb0f2073b6a807f1cd65b4b8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.tasks;
import com.google.inject.Inject;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.jooq.DSLContext;
import org.jooq.Record3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
/**
* Class to iterate over the OM DB and store the counts of existing/new
* files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
* fileSize DB.
*/
public class FileSizeCountTask implements ReconOmTask {
private static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTask.class);
// 1125899906842624L = 1PB
private static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L;
private FileCountBySizeDao fileCountBySizeDao;
private DSLContext dslContext;
@Inject
public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
UtilizationSchemaDefinition
utilizationSchemaDefinition) {
this.fileCountBySizeDao = fileCountBySizeDao;
this.dslContext = utilizationSchemaDefinition.getDSLContext();
}
private static int nextClosestPowerIndexOfTwo(long dataSize) {
int index = 0;
while(dataSize != 0) {
dataSize >>= 1;
index += 1;
}
return index;
}
/**
* Read the Keys from OM snapshot DB and calculate the upper bound of
* File Size it belongs to.
*
* @param omMetadataManager OM Metadata instance.
* @return Pair
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
}
} catch (IOException ioEx) {
LOG.error("Unable to populate File Size Count in Recon DB. ", ioEx);
return new ImmutablePair<>(getTaskName(), false);
}
// Truncate table before inserting new rows
int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
writeCountsToDB(true, fileSizeCountMap);
LOG.info("Completed a 'reprocess' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);
}
@Override
public String getTaskName() {
return "FileSizeCountTask";
}
public Collection<String> getTaskTables() {
return Collections.singletonList(KEY_TABLE);
}
/**
* Read the Keys from update events and update the count of files
* pertaining to a certain upper bound.
*
* @param events Update events - PUT/DELETE.
* @return Pair
*/
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
final Collection<String> taskTables = getTaskTables();
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
// Filter event inside process method to avoid duping
if (!taskTables.contains(omdbUpdateEvent.getTable())) {
continue;
}
String updatedKey = omdbUpdateEvent.getKey();
OmKeyInfo omKeyInfo = omdbUpdateEvent.getValue();
try{
switch (omdbUpdateEvent.getAction()) {
case PUT:
handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;
case DELETE:
handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
break;
case UPDATE:
handleDeleteKeyEvent(updatedKey, omdbUpdateEvent.getOldValue(),
fileSizeCountMap);
handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;
default: LOG.trace("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
} catch (Exception e) {
LOG.error("Unexpected exception while processing key {}.",
updatedKey, e);
return new ImmutablePair<>(getTaskName(), false);
}
}
writeCountsToDB(false, fileSizeCountMap);
LOG.info("Completed a 'process' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);
}
private long getFileSizeUpperBound(long fileSize) {
if (fileSize >= MAX_FILE_SIZE_UPPER_BOUND) {
return Long.MAX_VALUE;
}
int index = nextClosestPowerIndexOfTwo(fileSize);
// The smallest file size being tracked for count
// is 1 KB i.e. 1024 = 2 ^ 10.
int binIndex = index < 10 ? 0 : index - 10;
return (long) Math.pow(2, (10 + binIndex));
}
/**
* Populate DB with the counts of file sizes calculated
* using the dao.
*
*/
private void writeCountsToDB(boolean isDbTruncated,
Map<FileSizeCountKey, Long> fileSizeCountMap) {
fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
FileCountBySize newRecord = new FileCountBySize();
newRecord.setVolume(key.volume);
newRecord.setBucket(key.bucket);
newRecord.setFileSize(key.fileSizeUpperBound);
newRecord.setCount(fileSizeCountMap.get(key));
if (!isDbTruncated) {
// Get the current count from database and update
Record3<String, String, Long> recordToFind =
dslContext.newRecord(
FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1(key.volume)
.value2(key.bucket)
.value3(key.fileSizeUpperBound);
FileCountBySize fileCountRecord =
fileCountBySizeDao.findById(recordToFind);
if (fileCountRecord == null && newRecord.getCount() > 0L) {
// insert new row only for non-zero counts.
fileCountBySizeDao.insert(newRecord);
} else if (fileCountRecord != null) {
newRecord.setCount(fileCountRecord.getCount() +
fileSizeCountMap.get(key));
fileCountBySizeDao.update(newRecord);
}
} else if (newRecord.getCount() > 0) {
// insert new row only for non-zero counts.
fileCountBySizeDao.insert(newRecord);
}
});
}
private FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
return new FileSizeCountKey(omKeyInfo.getVolumeName(),
omKeyInfo.getBucketName(),
getFileSizeUpperBound(omKeyInfo.getDataSize()));
}
/**
* Calculate and update the count of files being tracked by
* fileSizeCountMap.
* Used by reprocess() and process().
*
* @param omKeyInfo OmKey being updated for count
*/
private void handlePutKeyEvent(OmKeyInfo omKeyInfo,
Map<FileSizeCountKey, Long> fileSizeCountMap) {
FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
Long count = fileSizeCountMap.containsKey(key) ?
fileSizeCountMap.get(key) + 1L : 1L;
fileSizeCountMap.put(key, count);
}
/**
* Calculate and update the count of files being tracked by
* fileSizeCountMap.
* Used by reprocess() and process().
*
* @param omKeyInfo OmKey being updated for count
*/
private void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
Map<FileSizeCountKey, Long>
fileSizeCountMap) {
if (omKeyInfo == null) {
LOG.warn("Deleting a key not found while handling DELETE key event. Key" +
" not found in Recon OM DB : {}", key);
} else {
FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
Long count = fileSizeCountMap.containsKey(countKey) ?
fileSizeCountMap.get(countKey) - 1L : -1L;
fileSizeCountMap.put(countKey, count);
}
}
private static class FileSizeCountKey {
private String volume;
private String bucket;
private Long fileSizeUpperBound;
FileSizeCountKey(String volume, String bucket,
Long fileSizeUpperBound) {
this.volume = volume;
this.bucket = bucket;
this.fileSizeUpperBound = fileSizeUpperBound;
}
@Override
public boolean equals(Object obj) {
if(obj instanceof FileSizeCountKey) {
FileSizeCountKey s = (FileSizeCountKey) obj;
return volume.equals(s.volume) && bucket.equals(s.bucket) &&
fileSizeUpperBound.equals(s.fileSizeUpperBound);
}
return false;
}
@Override
public int hashCode() {
return (volume + bucket + fileSizeUpperBound).hashCode();
}
}
}