blob: 3874ddac6190106990370988275e8882996bb46f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.tasks;
import com.google.inject.Inject;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static org.apache.hadoop.ozone.recon.tasks.
OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.
OMDBUpdateEvent.OMDBUpdateAction.PUT;
/**
* Class to iterate over the OM DB and store the counts of existing/new
* files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
* fileSize DB.
*/
public class FileSizeCountTask implements ReconDBUpdateTask {
private static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTask.class);
private int maxBinSize = -1;
private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB
private long[] upperBoundCount;
private long oneKb = 1024L;
private FileCountBySizeDao fileCountBySizeDao;
@Inject
public FileSizeCountTask(Configuration sqlConfiguration) {
fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration);
upperBoundCount = new long[getMaxBinSize()];
}
long getOneKB() {
return oneKb;
}
long getMaxFileSizeUpperBound() {
return maxFileSizeUpperBound;
}
int getMaxBinSize() {
if (maxBinSize == -1) {
// extra bin to add files > 1PB.
// 1 KB (2 ^ 10) is the smallest tracked file.
maxBinSize = nextClosestPowerIndexOfTwo(maxFileSizeUpperBound) - 10 + 1;
}
return maxBinSize;
}
/**
* Read the Keys from OM snapshot DB and calculate the upper bound of
* File Size it belongs to.
*
* @param omMetadataManager OM Metadata instance.
* @return Pair
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
// reprocess() is a PUT operation on the DB.
updateUpperBoundCount(kv.getValue(), PUT);
}
} catch (IOException ioEx) {
LOG.error("Unable to populate File Size Count in Recon DB. ", ioEx);
return new ImmutablePair<>(getTaskName(), false);
}
populateFileCountBySizeDB();
LOG.info("Completed a 'reprocess' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);
}
@Override
public String getTaskName() {
return "FileSizeCountTask";
}
@Override
public Collection<String> getTaskTables() {
return Collections.singletonList(KEY_TABLE);
}
private void updateCountFromDB() {
// Read - Write operations to DB are in ascending order
// of file size upper bounds.
List<FileCountBySize> resultSet = fileCountBySizeDao.findAll();
int index = 0;
if (resultSet != null) {
for (FileCountBySize row : resultSet) {
upperBoundCount[index] = row.getCount();
index++;
}
}
}
/**
* Read the Keys from update events and update the count of files
* pertaining to a certain upper bound.
*
* @param events Update events - PUT/DELETE.
* @return Pair
*/
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
//update array with file size count from DB
updateCountFromDB();
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
String updatedKey = omdbUpdateEvent.getKey();
OmKeyInfo omKeyInfo = omdbUpdateEvent.getValue();
try{
switch (omdbUpdateEvent.getAction()) {
case PUT:
updateUpperBoundCount(omKeyInfo, PUT);
break;
case DELETE:
updateUpperBoundCount(omKeyInfo, DELETE);
break;
default: LOG.trace("Skipping DB update event : " + omdbUpdateEvent
.getAction());
}
} catch (IOException e) {
LOG.error("Unexpected exception while updating key data : {} {}",
updatedKey, e.getMessage());
return new ImmutablePair<>(getTaskName(), false);
}
populateFileCountBySizeDB();
}
LOG.info("Completed a 'process' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);
}
/**
* Calculate the bin index based on size of the Key.
* index is calculated as the number of right shifts
* needed until dataSize becomes zero.
*
* @param dataSize Size of the key.
* @return int bin index in upperBoundCount
*/
public int calculateBinIndex(long dataSize) {
if (dataSize >= getMaxFileSizeUpperBound()) {
return getMaxBinSize() - 1;
}
int index = nextClosestPowerIndexOfTwo(dataSize);
// The smallest file size being tracked for count
// is 1 KB i.e. 1024 = 2 ^ 10.
return index < 10 ? 0 : index - 10;
}
int nextClosestPowerIndexOfTwo(long dataSize) {
int index = 0;
while(dataSize != 0) {
dataSize >>= 1;
index += 1;
}
return index;
}
/**
* Populate DB with the counts of file sizes calculated
* using the dao.
*
*/
void populateFileCountBySizeDB() {
for (int i = 0; i < upperBoundCount.length; i++) {
long fileSizeUpperBound = (i == upperBoundCount.length - 1) ?
Long.MAX_VALUE : (long) Math.pow(2, (10 + i));
FileCountBySize fileCountRecord =
fileCountBySizeDao.findById(fileSizeUpperBound);
FileCountBySize newRecord = new
FileCountBySize(fileSizeUpperBound, upperBoundCount[i]);
if (fileCountRecord == null) {
fileCountBySizeDao.insert(newRecord);
} else {
fileCountBySizeDao.update(newRecord);
}
}
}
/**
* Calculate and update the count of files being tracked by
* upperBoundCount[].
* Used by reprocess() and process().
*
* @param omKeyInfo OmKey being updated for count
* @param operation (PUT, DELETE)
*/
void updateUpperBoundCount(OmKeyInfo omKeyInfo,
OMDBUpdateEvent.OMDBUpdateAction operation) throws IOException {
int binIndex = calculateBinIndex(omKeyInfo.getDataSize());
if (operation == PUT) {
upperBoundCount[binIndex]++;
} else if (operation == DELETE) {
if (upperBoundCount[binIndex] != 0) {
//decrement only if it had files before, default DB value is 0
upperBoundCount[binIndex]--;
} else {
LOG.warn("Unexpected error while updating bin count. Found 0 count " +
"for index : " + binIndex + " while processing DELETE event for "
+ omKeyInfo.getKeyName());
}
}
}
}