blob: 0dd5b67f9eb1446eb89a0b0097231380b2fe03bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.tasks;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
/**
* Class to iterate over the OM DB and populate the Recon container DB with
* the container -> Key reverse mapping.
*/
public class ContainerKeyMapperTask implements ReconOmTask {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerKeyMapperTask.class);
private ReconContainerMetadataManager reconContainerMetadataManager;
private final long containerKeyFlushToDBMaxThreshold;
@Inject
public ContainerKeyMapperTask(ReconContainerMetadataManager
reconContainerMetadataManager,
OzoneConfiguration configuration) {
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.containerKeyFlushToDBMaxThreshold = configuration.getLong(
ReconServerConfigKeys.
OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD,
ReconServerConfigKeys.
OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT
);
}
/**
* Read Key -> ContainerId data from OM snapshot DB and write reverse map
* (container, key) -> count to Recon Container DB.
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
long omKeyCount = 0;
// In-memory maps for fast look up and batch write
// (container, key) -> count
Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
// containerId -> key count
Map<Long, Long> containerKeyCountMap = new HashMap<>();
try {
LOG.info("Starting a 'reprocess' run of ContainerKeyMapperTask.");
Instant start = Instant.now();
// initialize new container DB
reconContainerMetadataManager
.reinitWithNewContainerDataFromOm(new HashMap<>());
// loop over both key table and file table
for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY,
BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
// (HDDS-8580) Since "reprocess" iterate over the whole key table,
// containerKeyMap needs to be incrementally flushed to DB based on
// configured batch threshold.
// containerKeyCountMap can be flushed at the end since the number
// of containers in a cluster will not have significant memory overhead.
Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(layout);
try (
TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
OmKeyInfo omKeyInfo = kv.getValue();
handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap,
containerKeyCountMap);
if (!checkAndCallFlushToDB(containerKeyMap)) {
LOG.error("Unable to flush containerKey information to the DB");
return new ImmutablePair<>(getTaskName(), false);
}
omKeyCount++;
}
}
}
// flush and commit left out keys at end,
// also batch write containerKeyCountMap to the containerKeyCountTable
if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap,
containerKeyCountMap)) {
LOG.error("Unable to flush Container Key Count and " +
"remaining Container Key information to the DB");
return new ImmutablePair<>(getTaskName(), false);
}
LOG.info("Completed 'reprocess' of ContainerKeyMapperTask.");
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
LOG.info("It took me {} seconds to process {} keys.",
(double) duration / 1000.0, omKeyCount);
} catch (IOException ioEx) {
LOG.error("Unable to populate Container Key data in Recon DB. ",
ioEx);
return new ImmutablePair<>(getTaskName(), false);
}
return new ImmutablePair<>(getTaskName(), true);
}
private boolean flushAndCommitContainerKeyInfoToDB(
Map<ContainerKeyPrefix, Integer> containerKeyMap,
Map<Long, Long> containerKeyCountMap) {
try {
// deleted container list is not needed since "reprocess" only has
// put operations
writeToTheDB(containerKeyMap, containerKeyCountMap,
Collections.emptyList());
containerKeyMap.clear();
containerKeyCountMap.clear();
} catch (IOException e) {
LOG.error("Unable to write Container Key and " +
"Container Key Count data in Recon DB.", e);
return false;
}
return true;
}
private boolean checkAndCallFlushToDB(
Map<ContainerKeyPrefix, Integer> containerKeyMap) {
// if containerKeyMap more than entries, flush to DB and clear the map
if (null != containerKeyMap && containerKeyMap.size() >=
containerKeyFlushToDBMaxThreshold) {
return flushAndCommitContainerKeyInfoToDB(containerKeyMap,
Collections.emptyMap());
}
return true;
}
@Override
public String getTaskName() {
return "ContainerKeyMapperTask";
}
public Collection<String> getTaskTables() {
List<String> taskTables = new ArrayList<>();
taskTables.add(KEY_TABLE);
taskTables.add(FILE_TABLE);
return taskTables;
}
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
int eventCount = 0;
final Collection<String> taskTables = getTaskTables();
// In-memory maps for fast look up and batch write
// (HDDS-8580) containerKeyMap map is allowed to be used
// in "process" without batching since the maximum number of keys
// is bounded by delta limit configurations
// (container, key) -> count
Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>();
// containerId -> key count
Map<Long, Long> containerKeyCountMap = new HashMap<>();
// List of the deleted (container, key) pair's
List<ContainerKeyPrefix> deletedKeyCountList = new ArrayList<>();
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
// Filter event inside process method to avoid duping
if (!taskTables.contains(omdbUpdateEvent.getTable())) {
continue;
}
String updatedKey = omdbUpdateEvent.getKey();
OmKeyInfo updatedKeyValue = omdbUpdateEvent.getValue();
try {
switch (omdbUpdateEvent.getAction()) {
case PUT:
handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap,
containerKeyCountMap, deletedKeyCountList);
break;
case DELETE:
handleDeleteOMKeyEvent(updatedKey, containerKeyMap,
containerKeyCountMap, deletedKeyCountList);
break;
case UPDATE:
if (omdbUpdateEvent.getOldValue() != null) {
handleDeleteOMKeyEvent(
omdbUpdateEvent.getOldValue().getKeyName(), containerKeyMap,
containerKeyCountMap, deletedKeyCountList);
} else {
LOG.warn("Update event does not have the old Key Info for {}.",
updatedKey);
}
handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap,
containerKeyCountMap, deletedKeyCountList);
break;
default: LOG.debug("Skipping DB update event : {}",
omdbUpdateEvent.getAction());
}
eventCount++;
} catch (IOException e) {
LOG.error("Unexpected exception while updating key data : {} ",
updatedKey, e);
return new ImmutablePair<>(getTaskName(), false);
}
}
try {
writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList);
} catch (IOException e) {
LOG.error("Unable to write Container Key Prefix data in Recon DB.", e);
return new ImmutablePair<>(getTaskName(), false);
}
LOG.info("{} successfully processed {} OM DB update event(s).",
getTaskName(), eventCount);
return new ImmutablePair<>(getTaskName(), true);
}
private void writeToTheDB(Map<ContainerKeyPrefix, Integer> containerKeyMap,
Map<Long, Long> containerKeyCountMap,
List<ContainerKeyPrefix> deletedContainerKeyList)
throws IOException {
try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
containerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> {
try {
reconContainerMetadataManager
.batchStoreContainerKeyMapping(rdbBatchOperation, key,
containerKeyMap.get(key));
} catch (IOException e) {
LOG.error("Unable to write Container Key Prefix data in Recon DB.",
e);
}
});
containerKeyCountMap.keySet().forEach((Long key) -> {
try {
reconContainerMetadataManager
.batchStoreContainerKeyCounts(rdbBatchOperation, key,
containerKeyCountMap.get(key));
} catch (IOException e) {
LOG.error("Unable to write Container Key Prefix data in Recon DB.",
e);
}
});
deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> {
try {
reconContainerMetadataManager
.batchDeleteContainerMapping(rdbBatchOperation, key);
} catch (IOException e) {
LOG.error("Unable to write Container Key Prefix data in Recon DB.",
e);
}
});
reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation);
}
}
/**
* Note to delete an OM Key and update the containerID -> no. of keys counts
* (we are preparing for batch deletion in these data structures).
*
* @param key key String.
* @param containerKeyMap we keep the added containerKeys in this map
* (in this batch)
* @param containerKeyCountMap we keep the containerKey counts in this map
* @param deletedContainerKeyList list of the deleted containerKeys
* @throws IOException If Unable to write to container DB.
*/
private void handleDeleteOMKeyEvent(String key,
Map<ContainerKeyPrefix, Integer>
containerKeyMap,
Map<Long, Long> containerKeyCountMap,
List<ContainerKeyPrefix>
deletedContainerKeyList)
throws IOException {
Set<ContainerKeyPrefix> keysToBeDeleted = new HashSet<>();
try (TableIterator<KeyPrefixContainer, ? extends
Table.KeyValue<KeyPrefixContainer, Integer>> keyContainerIterator =
reconContainerMetadataManager.getKeyContainerTableIterator()) {
// Check if we have keys in this container in the DB
keyContainerIterator.seek(KeyPrefixContainer.get(key));
while (keyContainerIterator.hasNext()) {
Table.KeyValue<KeyPrefixContainer, Integer> keyValue =
keyContainerIterator.next();
String keyPrefix = keyValue.getKey().getKeyPrefix();
if (keyPrefix.equals(key)) {
if (keyValue.getKey().getContainerId() != -1) {
keysToBeDeleted.add(keyValue.getKey().toContainerKeyPrefix());
}
} else {
break;
}
}
}
// Check if we have keys in this container in our containerKeyMap
containerKeyMap.keySet()
.forEach((ContainerKeyPrefix containerKeyPrefix) -> {
String keyPrefix = containerKeyPrefix.getKeyPrefix();
if (keyPrefix.equals(key)) {
keysToBeDeleted.add(containerKeyPrefix);
}
});
for (ContainerKeyPrefix containerKeyPrefix : keysToBeDeleted) {
deletedContainerKeyList.add(containerKeyPrefix);
// Remove the container-key prefix from the map if we previously added
// it in this batch (and now we delete it)
containerKeyMap.remove(containerKeyPrefix);
// decrement count and update containerKeyCount.
Long containerID = containerKeyPrefix.getContainerId();
long keyCount;
if (containerKeyCountMap.containsKey(containerID)) {
keyCount = containerKeyCountMap.get(containerID);
} else {
keyCount = reconContainerMetadataManager
.getKeyCountForContainer(containerID);
}
if (keyCount > 0) {
containerKeyCountMap.put(containerID, --keyCount);
}
}
}
/**
* Note to add an OM key and update containerID -> no. of keys count.
*
* @param key key String
* @param omKeyInfo omKeyInfo value
* @param containerKeyMap we keep the added containerKeys in this map
* (in this batch)
* @param containerKeyCountMap we keep the containerKey counts in this map
* @param deletedContainerKeyList list of the deleted containerKeys
* @throws IOException if unable to write to recon DB.
*/
private void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo,
Map<ContainerKeyPrefix, Integer>
containerKeyMap,
Map<Long, Long> containerKeyCountMap,
List<ContainerKeyPrefix>
deletedContainerKeyList)
throws IOException {
long containerCountToIncrement = 0;
for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
.getKeyLocationVersions()) {
long keyVersion = omKeyLocationInfoGroup.getVersion();
for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
.getLocationList()) {
long containerId = omKeyLocationInfo.getContainerID();
ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(
containerId, key, keyVersion);
if (reconContainerMetadataManager.getCountForContainerKeyPrefix(
containerKeyPrefix) == 0
&& !containerKeyMap.containsKey(containerKeyPrefix)) {
// Save on writes. No need to save same container-key prefix
// mapping again.
containerKeyMap.put(containerKeyPrefix, 1);
// Remove the container-key prefix from the deleted list if we
// previously deleted it in this batch (and now we add it again)
deletedContainerKeyList.remove(containerKeyPrefix);
// check if container already exists and
// increment the count of containers if it does not exist
if (!reconContainerMetadataManager.doesContainerExists(containerId)
&& !containerKeyCountMap.containsKey(containerId)) {
containerCountToIncrement++;
}
// update the count of keys for the given containerID
long keyCount;
if (containerKeyCountMap.containsKey(containerId)) {
keyCount = containerKeyCountMap.get(containerId);
} else {
keyCount = reconContainerMetadataManager
.getKeyCountForContainer(containerId);
}
// increment the count and update containerKeyCount.
// keyCount will be 0 if containerID is not found. So, there is no
// need to initialize keyCount for the first time.
containerKeyCountMap.put(containerId, ++keyCount);
}
}
}
if (containerCountToIncrement > 0) {
reconContainerMetadataManager
.incrementContainerCountBy(containerCountToIncrement);
}
}
/**
* Write an OM key to container DB and update containerID -> no. of keys
* count to the Global Stats table.
*
* @param key key String
* @param omKeyInfo omKeyInfo value
* @param containerKeyMap we keep the added containerKeys in this map
* to allow incremental batching to containerKeyTable
* @param containerKeyCountMap we keep the containerKey counts in this map
* to allow batching to containerKeyCountTable
* after reprocessing is done
* @throws IOException if unable to write to recon DB.
*/
private void handleKeyReprocess(String key,
OmKeyInfo omKeyInfo,
Map<ContainerKeyPrefix, Integer>
containerKeyMap,
Map<Long, Long> containerKeyCountMap)
throws IOException {
long containerCountToIncrement = 0;
for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo
.getKeyLocationVersions()) {
long keyVersion = omKeyLocationInfoGroup.getVersion();
for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup
.getLocationList()) {
long containerId = omKeyLocationInfo.getContainerID();
ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(
containerId, key, keyVersion);
if (reconContainerMetadataManager.getCountForContainerKeyPrefix(
containerKeyPrefix) == 0
&& !containerKeyMap.containsKey(containerKeyPrefix)) {
// Save on writes. No need to save same container-key prefix
// mapping again.
containerKeyMap.put(containerKeyPrefix, 1);
// check if container already exists and
// if it exists, update the count of keys for the given containerID
// else, increment the count of containers and initialize keyCount
long keyCount;
if (containerKeyCountMap.containsKey(containerId)) {
keyCount = containerKeyCountMap.get(containerId);
} else {
containerCountToIncrement++;
keyCount = 0;
}
// increment the count and update containerKeyCount.
containerKeyCountMap.put(containerId, ++keyCount);
}
}
}
if (containerCountToIncrement > 0) {
reconContainerMetadataManager
.incrementContainerCountBy(containerCountToIncrement);
}
}
}