| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.ozone.om; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import com.google.protobuf.ServiceException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; |
| import org.apache.hadoop.ozone.common.BlockGroup; |
| import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; |
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.hdds.utils.BackgroundService; |
| import org.apache.hadoop.hdds.utils.BackgroundTask; |
| import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; |
| import org.apache.hadoop.hdds.utils.BackgroundTaskResult; |
| import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; |
| |
| import org.apache.hadoop.hdds.utils.db.BatchOperation; |
| import org.apache.hadoop.hdds.utils.db.DBStore; |
| import org.apache.hadoop.hdds.utils.db.Table; |
| import org.apache.ratis.protocol.ClientId; |
| import org.rocksdb.RocksDBException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This is the background service to delete keys. Scan the metadata of om |
| * periodically to get the keys from DeletedTable and ask scm to delete |
| * metadata accordingly, if scm returns success for keys, then clean up those |
| * keys. |
| */ |
| public class KeyDeletingService extends BackgroundService { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(KeyDeletingService.class); |
| |
| // The thread pool size for key deleting service. |
| private final static int KEY_DELETING_CORE_POOL_SIZE = 2; |
| |
| private final OzoneManager ozoneManager; |
| private final ScmBlockLocationProtocol scmClient; |
| private final KeyManager manager; |
| private ClientId clientId = ClientId.randomId(); |
| private final int keyLimitPerTask; |
| private final AtomicLong deletedKeyCount; |
| private final AtomicLong runCount; |
| |
| KeyDeletingService(OzoneManager ozoneManager, |
| ScmBlockLocationProtocol scmClient, |
| KeyManager manager, long serviceInterval, |
| long serviceTimeout, Configuration conf) { |
| super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, |
| KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); |
| this.ozoneManager = ozoneManager; |
| this.scmClient = scmClient; |
| this.manager = manager; |
| this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, |
| OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); |
| this.deletedKeyCount = new AtomicLong(0); |
| this.runCount = new AtomicLong(0); |
| } |
| |
| /** |
| * Returns the number of times this Background service has run. |
| * |
| * @return Long, run count. |
| */ |
| @VisibleForTesting |
| public AtomicLong getRunCount() { |
| return runCount; |
| } |
| |
| /** |
| * Returns the number of keys deleted by the background service. |
| * |
| * @return Long count. |
| */ |
| @VisibleForTesting |
| public AtomicLong getDeletedKeyCount() { |
| return deletedKeyCount; |
| } |
| |
| @Override |
| public BackgroundTaskQueue getTasks() { |
| BackgroundTaskQueue queue = new BackgroundTaskQueue(); |
| queue.add(new KeyDeletingTask()); |
| return queue; |
| } |
| |
| private boolean shouldRun() { |
| if (ozoneManager == null) { |
| // OzoneManager can be null for testing |
| return true; |
| } |
| return ozoneManager.isLeader(); |
| } |
| |
| private boolean isRatisEnabled() { |
| if (ozoneManager == null) { |
| return false; |
| } |
| return ozoneManager.isRatisEnabled(); |
| } |
| |
| /** |
| * A key deleting task scans OM DB and looking for a certain number of |
| * pending-deletion keys, sends these keys along with their associated blocks |
| * to SCM for deletion. Once SCM confirms keys are deleted (once SCM persisted |
| * the blocks info in its deletedBlockLog), it removes these keys from the |
| * DB. |
| */ |
| private class KeyDeletingTask implements |
| BackgroundTask<BackgroundTaskResult> { |
| |
| @Override |
| public int getPriority() { |
| return 0; |
| } |
| |
| @Override |
| public BackgroundTaskResult call() throws Exception { |
| // Check if this is the Leader OM. If not leader, no need to execute this |
| // task. |
| if (shouldRun()) { |
| runCount.incrementAndGet(); |
| try { |
| long startTime = Time.monotonicNow(); |
| List<BlockGroup> keyBlocksList = manager |
| .getPendingDeletionKeys(keyLimitPerTask); |
| if (keyBlocksList != null && keyBlocksList.size() > 0) { |
| List<DeleteBlockGroupResult> results = |
| scmClient.deleteKeyBlocks(keyBlocksList); |
| if (results != null) { |
| int delCount; |
| if (isRatisEnabled()) { |
| delCount = submitPurgeKeysRequest(results); |
| } else { |
| // TODO: Once HA and non-HA paths are merged, we should have |
| // only one code path here. Purge keys should go through an |
| // OMRequest model. |
| delCount = deleteAllKeys(results); |
| } |
| LOG.debug("Number of keys deleted: {}, elapsed time: {}ms", |
| delCount, Time.monotonicNow() - startTime); |
| deletedKeyCount.addAndGet(delCount); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Error while running delete keys background task. Will " + |
| "retry at next run.", e); |
| } |
| } |
| // By design, no one cares about the results of this call back. |
| return EmptyTaskResult.newResult(); |
| } |
| |
| /** |
| * Deletes all the keys that SCM has acknowledged and queued for delete. |
| * |
| * @param results DeleteBlockGroups returned by SCM. |
| * @throws RocksDBException on Error. |
| * @throws IOException on Error |
| */ |
| private int deleteAllKeys(List<DeleteBlockGroupResult> results) |
| throws RocksDBException, IOException { |
| Table deletedTable = manager.getMetadataManager().getDeletedTable(); |
| |
| DBStore store = manager.getMetadataManager().getStore(); |
| |
| // Put all keys to delete in a single transaction and call for delete. |
| int deletedCount = 0; |
| try (BatchOperation writeBatch = store.initBatchOperation()) { |
| for (DeleteBlockGroupResult result : results) { |
| if (result.isSuccess()) { |
| // Purge key from OM DB. |
| deletedTable.deleteWithBatch(writeBatch, |
| result.getObjectKey()); |
| LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); |
| deletedCount++; |
| } |
| } |
| // Write a single transaction for delete. |
| store.commitBatchOperation(writeBatch); |
| } |
| return deletedCount; |
| } |
| |
| /** |
| * Submits PurgeKeys request for the keys whose blocks have been deleted |
| * by SCM. |
| * |
| * @param results DeleteBlockGroups returned by SCM. |
| * @throws IOException on Error |
| */ |
| public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results) { |
| List<String> purgeKeysList = new ArrayList<>(); |
| |
| // Put all keys to be purged in a list |
| int deletedCount = 0; |
| for (DeleteBlockGroupResult result : results) { |
| if (result.isSuccess()) { |
| // Add key to PurgeKeys list. |
| String deletedKey = result.getObjectKey(); |
| purgeKeysList.add(deletedKey); |
| LOG.debug("Key {} set to be purged from OM DB", deletedKey); |
| deletedCount++; |
| } |
| } |
| |
| PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder() |
| .addAllKeys(purgeKeysList) |
| .build(); |
| |
| OMRequest omRequest = OMRequest.newBuilder() |
| .setCmdType(Type.PurgeKeys) |
| .setPurgeKeysRequest(purgeKeysRequest) |
| .setClientId(clientId.toString()) |
| .build(); |
| |
| // Submit PurgeKeys request to OM |
| try { |
| ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); |
| } catch (ServiceException e) { |
| LOG.error("PurgeKey request failed. Will retry at next run."); |
| return 0; |
| } |
| |
| return deletedCount; |
| } |
| } |
| } |