blob: dca469ac9b7f77872de1c29d4eb8774dc4f61e9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.ha;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.HAUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.hdds.ExitManager;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
/**
* SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
* node Ratis ring. The Ratis ring will have one Leader node and 2N follower
* nodes.
*
* TODO
*
*/
public class SCMHAManagerImpl implements SCMHAManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMHAManagerImpl.class);
private final SCMRatisServer ratisServer;
private final ConfigurationSource conf;
private final DBTransactionBuffer transactionBuffer;
private final SCMSnapshotProvider scmSnapshotProvider;
private final StorageContainerManager scm;
private ExitManager exitManager;
// this should ideally be started only in a ratis leader
private final InterSCMGrpcProtocolService grpcServer;
/**
* Creates SCMHAManager instance.
*/
public SCMHAManagerImpl(final ConfigurationSource conf,
final StorageContainerManager scm) throws IOException {
this.conf = conf;
this.scm = scm;
if (SCMHAUtils.isSCMHAEnabled(conf)) {
this.transactionBuffer = new SCMHADBTransactionBufferImpl(scm);
this.ratisServer = new SCMRatisServerImpl(conf, scm,
(SCMHADBTransactionBuffer) transactionBuffer);
this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
scm.getSCMHANodeDetails().getPeerNodeDetails());
grpcServer = new InterSCMGrpcProtocolService(conf, scm);
} else {
this.transactionBuffer = new SCMDBTransactionBufferImpl();
this.scmSnapshotProvider = null;
this.grpcServer = null;
this.ratisServer = null;
}
}
/**
* {@inheritDoc}
*/
@Override
public void start() throws IOException {
if (ratisServer == null) {
return;
}
ratisServer.start();
if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) {
// this is a bootstrapped node
// It will first try to add itself to existing ring
boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
new AddSCMRequest.Builder().setClusterId(scm.getClusterId())
.setScmId(scm.getScmId())
.setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails()
// TODO : Should we use IP instead of hostname??
.getRatisHostPortStr()).build(), scm.getSCMNodeId());
if (!success) {
throw new IOException("Adding SCM to existing HA group failed");
}
} else {
LOG.info(" scm role is {} peers {}",
ratisServer.getDivision().getInfo().getCurrentRole(),
ratisServer.getDivision().getGroup().getPeers());
}
grpcServer.start();
}
public SCMRatisServer getRatisServer() {
return ratisServer;
}
@Override
public DBTransactionBuffer getDBTransactionBuffer() {
return transactionBuffer;
}
@Override
public SCMSnapshotProvider getSCMSnapshotProvider() {
return scmSnapshotProvider;
}
@Override
public SCMHADBTransactionBuffer asSCMHADBTransactionBuffer() {
Preconditions
.checkArgument(transactionBuffer instanceof SCMHADBTransactionBuffer);
return (SCMHADBTransactionBuffer)transactionBuffer;
}
/**
* Download and install latest checkpoint from leader SCM.
*
* @param leaderId peerNodeID of the leader SCM
* @return If checkpoint is installed successfully, return the
* corresponding termIndex. Otherwise, return null.
*/
public TermIndex installSnapshotFromLeader(String leaderId) {
if (scmSnapshotProvider == null) {
LOG.error("SCM Snapshot Provider is not configured as there are no peer "
+ "nodes.");
return null;
}
DBCheckpoint dBCheckpoint = getDBCheckpointFromLeader(leaderId);
LOG.info("Downloaded checkpoint from Leader {} to the location {}",
leaderId, dBCheckpoint.getCheckpointLocation());
TermIndex termIndex = null;
try {
termIndex = installCheckpoint(leaderId, dBCheckpoint);
} catch (Exception ex) {
LOG.error("Failed to install snapshot from Leader SCM.", ex);
}
return termIndex;
}
/**
* Download the latest SCM DB checkpoint from the leader SCM.
*
* @param leaderId SCMNodeID of the leader SCM node.
* @return latest DB checkpoint from leader SCM.
*/
private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
LOG.info("Downloading checkpoint from leader SCM {} and reloading state " +
"from the checkpoint.", leaderId);
try {
return scmSnapshotProvider.getSCMDBSnapshot(leaderId);
} catch (IOException e) {
LOG.error("Failed to download checkpoint from SCM leader {}", leaderId,
e);
}
return null;
}
/**
* Install checkpoint. If the checkpoints snapshot index is greater than
* SCM's last applied transaction index, then re-initialize the SCM
* state via this checkpoint. Before re-initializing SCM state, the SCM Ratis
* server should be stopped so that no new transactions can be applied.
*/
@VisibleForTesting
public TermIndex installCheckpoint(String leaderId, DBCheckpoint dbCheckpoint)
throws Exception {
Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
TransactionInfo checkpointTrxnInfo = HAUtils
.getTrxnInfoFromCheckpoint(OzoneConfiguration.of(conf),
checkpointLocation, new SCMDBDefinition());
LOG.info("Installing checkpoint with SCMTransactionInfo {}",
checkpointTrxnInfo);
return installCheckpoint(leaderId, checkpointLocation, checkpointTrxnInfo);
}
public TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
TransactionInfo checkpointTrxnInfo) throws Exception {
File dbBackup = null;
TermIndex termIndex =
getRatisServer().getSCMStateMachine().getLastAppliedTermIndex();
long term = termIndex.getTerm();
long lastAppliedIndex = termIndex.getIndex();
// Check if current applied log index is smaller than the downloaded
// checkpoint transaction index. If yes, proceed by stopping the ratis
// server so that the SCM state can be re-initialized. If no then do not
// proceed with installSnapshot.
boolean canProceed = HAUtils
.verifyTransactionInfo(checkpointTrxnInfo, lastAppliedIndex, leaderId,
checkpointLocation, LOG);
File oldDBLocation = scm.getScmMetadataStore().getStore().getDbLocation();
if (canProceed) {
try {
// Stop services
stopServices();
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the SCM Double Buffer so that if there
// are any pending transactions in the buffer, they are discarded.
getRatisServer().getSCMStateMachine().pause();
} catch (Exception e) {
LOG.error("Failed to stop/ pause the services. Cannot proceed with "
+ "installing the new checkpoint.");
startServices();
throw e;
}
try {
dbBackup = HAUtils
.replaceDBWithCheckpoint(lastAppliedIndex, oldDBLocation,
checkpointLocation, OzoneConsts.SCM_DB_BACKUP_PREFIX);
term = checkpointTrxnInfo.getTerm();
lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex();
LOG.info(
"Replaced DB with checkpoint from SCM: {}, term: {}, index: {}",
leaderId, term, lastAppliedIndex);
} catch (Exception e) {
LOG.error("Failed to install Snapshot from {} as SCM failed to replace"
+ " DB with downloaded checkpoint. Reloading old SCM state.", e);
}
// Reload the DB store with the new checkpoint.
// Restart (unpause) the state machine and update its last applied index
// to the installed checkpoint's snapshot index.
try {
reloadSCMState();
getRatisServer().getSCMStateMachine().unpause(term, lastAppliedIndex);
LOG.info("Reloaded SCM state with Term: {} and Index: {}", term,
lastAppliedIndex);
} catch (Exception ex) {
String errorMsg =
"Failed to reload SCM state and instantiate services.";
exitManager.exitSystem(1, errorMsg, ex, LOG);
}
// Delete the backup DB
try {
if (dbBackup != null) {
FileUtils.deleteFully(dbBackup);
}
} catch (Exception e) {
LOG.error("Failed to delete the backup of the original DB {}",
dbBackup);
}
} else {
LOG.warn("Cannot proceed with InstallSnapshot as SCM is at TermIndex {} "
+ "and checkpoint has lower TermIndex {}. Reloading old "
+ "state of SCM.", termIndex, checkpointTrxnInfo.getTermIndex());
}
if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) {
// Install Snapshot failed and old state was reloaded. Return null to
// Ratis to indicate that installation failed.
return null;
}
TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
return newTermIndex;
}
/**
* Re-instantiate MetadataManager with new DB checkpoint.
* All the classes which use/ store MetadataManager should also be updated
* with the new MetadataManager instance.
*/
void reloadSCMState()
throws IOException {
startServices();
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() throws IOException {
if (ratisServer != null) {
ratisServer.stop();
ratisServer.getSCMStateMachine().close();
grpcServer.stop();
}
}
@Override
public boolean addSCM(AddSCMRequest request) throws IOException {
String clusterId = scm.getClusterId();
if (!request.getClusterId().equals(scm.getClusterId())) {
throw new IOException(
"SCM " + request.getScmId() + " with addr " + request.getRatisAddr()
+ " has cluster Id " + request.getClusterId()
+ " but leader SCM cluster id is " + clusterId);
}
Preconditions.checkNotNull(
getRatisServer().getDivision().getGroup().getGroupId());
return getRatisServer().addSCM(request);
}
void stopServices() throws Exception {
// just stop the SCMMetaData store. All other background
// services will be in pausing state in the follower.
scm.getScmMetadataStore().stop();
}
@VisibleForTesting
public void startServices() throws IOException {
// TODO: Fix the metrics ??
final SCMMetadataStore metadataStore = scm.getScmMetadataStore();
metadataStore.start(OzoneConfiguration.of(conf));
scm.getPipelineManager().reinitialize(metadataStore.getPipelineTable());
scm.getContainerManager().reinitialize(metadataStore.getContainerTable());
scm.getScmBlockManager().getDeletedBlockLog().reinitialize(
metadataStore.getDeletedBlocksTXTable());
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
if (scm.getRootCertificateServer() != null) {
scm.getRootCertificateServer().reinitialize(metadataStore);
}
scm.getScmCertificateServer().reinitialize(metadataStore);
}
}
@VisibleForTesting
public void setExitManagerForTesting(ExitManager exitManagerForTesting) {
this.exitManager = exitManagerForTesting;
}
@VisibleForTesting
public static Logger getLogger() {
return LOG;
}
}