blob: c4e0953fa35b917fb4466c971bad99925621e43f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.block;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* {@link DeletedBlockLogStateManager} implementation
* based on {@link DeletedBlocksTransaction}.
*/
public class DeletedBlockLogStateManagerImpl
implements DeletedBlockLogStateManager {
public static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogStateManagerImpl.class);
private Table<Long, DeletedBlocksTransaction> deletedTable;
private ContainerManager containerManager;
private final DBTransactionBuffer transactionBuffer;
private final Set<Long> deletingTxIDs;
private final Set<Long> skippingRetryTxIDs;
public DeletedBlockLogStateManagerImpl(ConfigurationSource conf,
Table<Long, DeletedBlocksTransaction> deletedTable,
ContainerManager containerManager, DBTransactionBuffer txBuffer) {
this.deletedTable = deletedTable;
this.containerManager = containerManager;
this.transactionBuffer = txBuffer;
final boolean isRatisEnabled = SCMHAUtils.isSCMHAEnabled(conf);
this.deletingTxIDs = isRatisEnabled ? ConcurrentHashMap.newKeySet() : null;
this.skippingRetryTxIDs =
isRatisEnabled ? ConcurrentHashMap.newKeySet() : null;
}
public TableIterator<Long, TypedTable.KeyValue<Long,
DeletedBlocksTransaction>> getReadOnlyIterator() {
return new TableIterator<Long, TypedTable.KeyValue<Long,
DeletedBlocksTransaction>>() {
private TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedTable.iterator();
private TypedTable.KeyValue<Long, DeletedBlocksTransaction> nextTx;
{
findNext();
}
private void findNext() {
while (iter.hasNext()) {
TypedTable.KeyValue<Long, DeletedBlocksTransaction> next = iter
.next();
long txID;
try {
txID = next.getKey();
} catch (IOException e) {
throw new IllegalStateException("");
}
if ((deletingTxIDs == null || !deletingTxIDs.contains(txID)) && (
skippingRetryTxIDs == null || !skippingRetryTxIDs
.contains(txID))) {
nextTx = next;
if (LOG.isTraceEnabled()) {
LOG.trace("DeletedBlocksTransaction matching txID:{}",
txID);
}
return;
}
}
nextTx = null;
}
@Override
public boolean hasNext() {
return nextTx != null;
}
@Override
public TypedTable.KeyValue<Long, DeletedBlocksTransaction> next() {
if (nextTx == null) {
throw new NoSuchElementException("DeletedBlocksTransaction " +
"Iterator reached end");
}
TypedTable.KeyValue<Long, DeletedBlocksTransaction> returnTx = nextTx;
findNext();
return returnTx;
}
@Override
public void close() throws IOException {
iter.close();
}
@Override
public void seekToFirst() {
throw new UnsupportedOperationException("seekToFirst");
}
@Override
public void seekToLast() {
throw new UnsupportedOperationException("seekToLast");
}
@Override
public TypedTable.KeyValue<Long, DeletedBlocksTransaction> seek(
Long key) throws IOException {
throw new UnsupportedOperationException("seek");
}
@Override
public void removeFromDB() throws IOException {
throw new UnsupportedOperationException("read-only");
}
};
}
@Override
public void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
throws IOException {
Map<ContainerID, Long> containerIdToTxnIdMap = new HashMap<>();
for (DeletedBlocksTransaction tx : txs) {
long tid = tx.getTxID();
containerIdToTxnIdMap.compute(ContainerID.valueOf(tx.getContainerID()),
(k, v) -> v != null && v > tid ? v : tid);
transactionBuffer.addToBuffer(deletedTable, tx.getTxID(), tx);
}
containerManager.updateDeleteTransactionId(containerIdToTxnIdMap);
}
@Override
public void removeTransactionsFromDB(ArrayList<Long> txIDs)
throws IOException {
if (deletingTxIDs != null) {
deletingTxIDs.addAll(txIDs);
}
for (Long txID : txIDs) {
transactionBuffer.removeFromBuffer(deletedTable, txID);
}
}
@Override
public void increaseRetryCountOfTransactionInDB(
ArrayList<Long> txIDs) throws IOException {
for (Long txID : txIDs) {
DeletedBlocksTransaction block =
deletedTable.get(txID);
if (block == null) {
if (LOG.isDebugEnabled()) {
// This can occur due to race condition between retry and old
// service task where old task removes the transaction and the new
// task is resending
LOG.debug("Deleted TXID {} not found.", txID);
}
continue;
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
DeletedBlocksTransaction.Builder builder = block.toBuilder().setCount(-1);
transactionBuffer.addToBuffer(deletedTable, txID, builder.build());
if (skippingRetryTxIDs != null) {
skippingRetryTxIDs.add(txID);
}
}
}
@Override
public int resetRetryCountOfTransactionInDB(ArrayList<Long> txIDs)
throws IOException {
Objects.requireNonNull(txIDs, "txIds cannot be null.");
int resetCount = 0;
for (long txId: txIDs) {
try {
DeletedBlocksTransaction transaction = deletedTable.get(txId);
if (transaction == null) {
LOG.warn("txId {} is not found in deletedTable.", txId);
continue;
}
if (transaction.getCount() != -1) {
LOG.warn("txId {} has already been reset in deletedTable.", txId);
continue;
}
transactionBuffer.addToBuffer(deletedTable, txId,
transaction.toBuilder().setCount(0).build());
resetCount += 1;
if (LOG.isDebugEnabled()) {
LOG.info("Reset deleted block Txn retry count to 0 in container {}" +
" with txnId {} ", transaction.getContainerID(), txId);
}
} catch (IOException ex) {
LOG.error("Could not reset deleted block transaction {}.", txId, ex);
throw ex;
}
}
LOG.info("Reset in total {} deleted block Txn retry count", resetCount);
return resetCount;
}
public void onFlush() {
// onFlush() can be invoked only when ratis is enabled.
Preconditions.checkNotNull(deletingTxIDs);
Preconditions.checkNotNull(skippingRetryTxIDs);
deletingTxIDs.clear();
skippingRetryTxIDs.clear();
}
@Override
public void reinitialize(
Table<Long, DeletedBlocksTransaction> deletedBlocksTXTable) {
// Before Reinitialization, flush will be called from Ratis StateMachine.
// Just the DeletedDb will be loaded here.
// We don't need to handle transactionBuffer, deletingTxIDs
// and skippingRetryTxIDs here, since onFlush() will be called
// before reinitialization. Just update deletedTable here.
Preconditions.checkArgument(deletingTxIDs.isEmpty());
this.deletedTable = deletedBlocksTXTable;
}
public static Builder newBuilder() {
return new Builder();
}
/**
* Builder for ContainerStateManager.
*/
public static class Builder {
private ConfigurationSource conf;
private SCMRatisServer scmRatisServer;
private Table<Long, DeletedBlocksTransaction> table;
private DBTransactionBuffer transactionBuffer;
private ContainerManager containerManager;
public Builder setConfiguration(final ConfigurationSource config) {
conf = config;
return this;
}
public Builder setRatisServer(final SCMRatisServer ratisServer) {
scmRatisServer = ratisServer;
return this;
}
public Builder setDeletedBlocksTable(
final Table<Long, DeletedBlocksTransaction> deletedBlocksTable) {
table = deletedBlocksTable;
return this;
}
public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
this.transactionBuffer = buffer;
return this;
}
public Builder setContainerManager(ContainerManager contManager) {
this.containerManager = contManager;
return this;
}
public DeletedBlockLogStateManager build() {
Preconditions.checkNotNull(conf);
Preconditions.checkNotNull(table);
final DeletedBlockLogStateManager impl =
new DeletedBlockLogStateManagerImpl(conf, table, containerManager,
transactionBuffer);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.BLOCK,
impl, scmRatisServer);
return (DeletedBlockLogStateManager) Proxy.newProxyInstance(
SCMHAInvocationHandler.class.getClassLoader(),
new Class<?>[]{DeletedBlockLogStateManager.class},
invocationHandler);
}
}
}