blob: f9f88eff5e48beb3818d62bc2999fa48d5ffa636 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.utils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.function.SupplierWithIOException;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.db.DBDefinition;
import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.RocksDBConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
/**
* utility class used by SCM and OM for HA.
*/
public final class HAUtils {
public static final Logger LOG = LoggerFactory.getLogger(HAUtils.class);
private HAUtils() {
}
public static ScmInfo getScmInfo(OzoneConfiguration conf)
throws IOException {
OzoneConfiguration configuration = new OzoneConfiguration(conf);
try {
long duration = conf.getTimeDuration(OZONE_SCM_INFO_WAIT_DURATION,
OZONE_SCM_INFO_WAIT_DURATION_DEFAULT, TimeUnit.SECONDS);
SCMClientConfig scmClientConfig =
configuration.getObject(SCMClientConfig.class);
int retryCount =
(int) (duration / (scmClientConfig.getRetryInterval()/1000));
// If duration is set to lesser value, fall back to actual default
// retry count.
if (retryCount > scmClientConfig.getRetryCount()) {
scmClientConfig.setRetryCount(retryCount);
configuration.setFromObject(scmClientConfig);
}
return getScmBlockClient(configuration).getScmInfo();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Failed to get SCM info", e);
}
}
/**
* Add SCM to the cluster.
* @param conf - OzoneConfiguration
* @param request - AddSCMRequest which has details of SCM to be added.
* @param selfId - Node Id of the SCM which is submitting the request to
* add SCM.
* @return true - if SCM node is added successfully, else false.
* @throws IOException
*/
public static boolean addSCM(OzoneConfiguration conf, AddSCMRequest request,
String selfId) throws IOException {
OzoneConfiguration config = SCMHAUtils.removeSelfId(conf, selfId);
try {
return getScmBlockClient(config).addSCM(request);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Failed to add SCM", e);
}
}
/**
* Create a scm block client.
*
* @return {@link ScmBlockLocationProtocol}
* @throws IOException
*/
public static ScmBlockLocationProtocol getScmBlockClient(
OzoneConfiguration conf) {
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
new SCMBlockLocationFailoverProxyProvider(conf));
return TracingUtil
.createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
conf);
}
public static StorageContainerLocationProtocol getScmContainerClient(
ConfigurationSource conf) {
SCMContainerLocationFailoverProxyProvider proxyProvider =
new SCMContainerLocationFailoverProxyProvider(conf);
StorageContainerLocationProtocol scmContainerClient =
TracingUtil.createProxy(
new StorageContainerLocationProtocolClientSideTranslatorPB(
proxyProvider), StorageContainerLocationProtocol.class, conf);
return scmContainerClient;
}
/**
* Replace the current DB with the new DB checkpoint.
*
* @param lastAppliedIndex the last applied index in the current SCM DB.
* @param checkpointPath path to the new DB checkpoint
* @return location of backup of the original DB
* @throws Exception
*/
public static File replaceDBWithCheckpoint(long lastAppliedIndex,
File oldDB, Path checkpointPath, String dbPrefix) throws IOException {
// Take a backup of the current DB
String dbBackupName =
dbPrefix + lastAppliedIndex + "_" + System
.currentTimeMillis();
File dbDir = oldDB.getParentFile();
File dbBackup = new File(dbDir, dbBackupName);
try {
Files.move(oldDB.toPath(), dbBackup.toPath());
} catch (IOException e) {
LOG.error("Failed to create a backup of the current DB. Aborting "
+ "snapshot installation.");
throw e;
}
// Move the new DB checkpoint into the metadata dir
Path markerFile = new File(dbDir, DB_TRANSIENT_MARKER).toPath();
try {
// Create a Transient Marker file. This file will be deleted if the
// checkpoint DB is successfully moved to the old DB location or if the
// old DB backup is reset to its location. If not, then the DB is in
// an inconsistent state and this marker file will fail it from
// starting up.
Files.createFile(markerFile);
Files.move(checkpointPath, oldDB.toPath());
Files.deleteIfExists(markerFile);
} catch (IOException e) {
LOG.error("Failed to move downloaded DB checkpoint {} to metadata "
+ "directory {}. Resetting to original DB.", checkpointPath,
oldDB.toPath());
try {
Files.move(dbBackup.toPath(), oldDB.toPath());
Files.deleteIfExists(markerFile);
} catch (IOException ex) {
String errorMsg = "Failed to reset to original DB. SCM is in an "
+ "inconsistent state.";
ExitUtils.terminate(1, errorMsg, ex, LOG);
}
throw e;
}
return dbBackup;
}
/**
* Obtain SCMTransactionInfo from Checkpoint.
*/
public static TransactionInfo getTrxnInfoFromCheckpoint(
OzoneConfiguration conf, Path dbPath, DBDefinition definition)
throws Exception {
if (dbPath != null) {
Path dbDir = dbPath.getParent();
Path dbFile = dbPath.getFileName();
if (dbDir != null && dbFile != null) {
return getTransactionInfoFromDB(conf, dbDir, dbFile.toString(),
definition);
}
}
throw new IOException("Checkpoint " + dbPath + " does not have proper " +
"DB location");
}
/**
* Obtain Transaction info from DB.
* @param tempConfig
* @param dbDir path to DB
* @return TransactionInfo
* @throws Exception
*/
private static TransactionInfo getTransactionInfoFromDB(
OzoneConfiguration tempConfig, Path dbDir, String dbName,
DBDefinition definition)
throws Exception {
DBStore dbStore = loadDB(tempConfig, dbDir.toFile(),
dbName, definition);
// Get the table name with TransactionInfo as the value. The transaction
// info table name are different in SCM and SCM.
// In case, a new table gets added where the value is TransactionInfo, this
// logic may not work.
Table<String, TransactionInfo> transactionInfoTable =
getTransactionInfoTable(dbStore, definition);
TransactionInfo transactionInfo =
transactionInfoTable.get(TRANSACTION_INFO_KEY);
dbStore.close();
if (transactionInfo == null) {
throw new IOException("Failed to read TransactionInfo from DB " +
definition.getName() + " at " + dbDir);
}
return transactionInfo;
}
public static Table<String, TransactionInfo> getTransactionInfoTable(
DBStore dbStore, DBDefinition definition) throws IOException {
return Arrays.stream(definition.getColumnFamilies())
.filter(t -> t.getValueType() == TransactionInfo.class).findFirst()
.get().getTable(dbStore);
}
/**
* Verify transaction info with provided lastAppliedIndex.
*
* If transaction info transaction Index is less than or equal to
* lastAppliedIndex, return false, else return true.
* @param transactionInfo
* @param lastAppliedIndex
* @param leaderId
* @param newDBlocation
* @return boolean
*/
public static boolean verifyTransactionInfo(TransactionInfo transactionInfo,
long lastAppliedIndex, String leaderId, Path newDBlocation,
Logger logger) {
if (transactionInfo.getTransactionIndex() <= lastAppliedIndex) {
logger.error("Failed to install checkpoint from SCM leader: {}"
+ ". The last applied index: {} is greater than or equal to the "
+ "checkpoint's applied index: {}. Deleting the downloaded "
+ "checkpoint {}", leaderId, lastAppliedIndex,
transactionInfo.getTransactionIndex(), newDBlocation);
try {
FileUtils.deleteFully(newDBlocation);
} catch (IOException e) {
logger.error("Failed to fully delete the downloaded DB "
+ "checkpoint {} from SCM leader {}.", newDBlocation, leaderId, e);
}
return false;
}
return true;
}
public static DBStore loadDB(OzoneConfiguration configuration, File metaDir,
String dbName, DBDefinition definition) throws IOException {
RocksDBConfiguration rocksDBConfiguration =
configuration.getObject(RocksDBConfiguration.class);
DBStoreBuilder dbStoreBuilder =
DBStoreBuilder.newBuilder(configuration, rocksDBConfiguration)
.setName(dbName).setPath(Paths.get(metaDir.getPath()));
// Add column family names and codecs.
for (DBColumnFamilyDefinition columnFamily : definition
.getColumnFamilies()) {
dbStoreBuilder.addTable(columnFamily.getName());
dbStoreBuilder
.addCodec(columnFamily.getKeyType(), columnFamily.getKeyCodec());
dbStoreBuilder
.addCodec(columnFamily.getValueType(), columnFamily.getValueCodec());
}
return dbStoreBuilder.build();
}
public static File getMetaDir(DBDefinition definition,
OzoneConfiguration configuration) {
// Set metadata dirs.
File metadataDir = definition.getDBLocation(configuration);
if (metadataDir == null) {
LOG.warn("{} is not configured. We recommend adding this setting. "
+ "Falling back to {} instead.",
definition.getLocationConfigKey(), HddsConfigKeys.
OZONE_METADATA_DIRS);
metadataDir = getOzoneMetaDirPath(configuration);
}
return metadataDir;
}
/**
* Unwrap exception to check if it is some kind of access control problem.
* {@link AccessControlException}
*/
public static boolean isAccessControlException(Exception ex) {
if (ex instanceof ServiceException) {
Throwable t = ex.getCause();
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
while (t != null) {
if (t instanceof AccessControlException) {
return true;
}
t = t.getCause();
}
}
return false;
}
public static void checkSecurityAndSCMHAEnabled(OzoneConfiguration conf) {
boolean enable =
conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_SECURITY_SUPPORTED,
ScmConfigKeys.OZONE_SCM_HA_SECURITY_SUPPORTED_DEFAULT);
if (OzoneSecurityUtil.isSecurityEnabled(conf) && !enable) {
List<SCMNodeInfo> scmNodeInfo = SCMNodeInfo.buildNodeInfo(conf);
if (scmNodeInfo.size() > 1) {
System.err.println("Ozone Services cannot be started on a secure SCM " +
"HA enabled cluster");
System.exit(1);
}
}
}
/**
* Build CA list which need to be passed to client.
*
* If certificate client is null, obtain the list of CA using SCM security
* client, else it uses certificate client.
* @param certClient
* @param configuration
* @return list of CA
* @throws IOException
*/
public static List<String> buildCAList(CertificateClient certClient,
ConfigurationSource configuration) throws IOException {
//TODO: make it configurable.
List<String> caCertPemList;
if (certClient != null) {
caCertPemList = new ArrayList<>();
if (!SCMHAUtils.isSCMHAEnabled(configuration)) {
if (certClient.getRootCACertificate() != null) {
caCertPemList.add(CertificateCodec.getPEMEncodedString(
certClient.getRootCACertificate()));
}
caCertPemList.add(CertificateCodec.getPEMEncodedString(
certClient.getCACertificate()));
} else {
Collection<String> scmNodes = SCMHAUtils.getSCMNodeIds(configuration);
int expectedCount = scmNodes.size() + 1;
if (scmNodes.size() > 1) {
// First check if cert client has ca list initialized.
// This is being done, when this method is called multiple times we
// don't make call to SCM, we return from in-memory.
caCertPemList = certClient.getCAList();
if (caCertPemList != null && caCertPemList.size() == expectedCount) {
return caCertPemList;
}
caCertPemList = waitForCACerts(() -> certClient.updateCAList(),
expectedCount);
checkCertCount(caCertPemList.size(), expectedCount);
} else {
caCertPemList = certClient.listCA();
}
}
} else {
SCMSecurityProtocolClientSideTranslatorPB scmSecurityProtocolClient =
HddsServerUtil.getScmSecurityClient(configuration);
if (!SCMHAUtils.isSCMHAEnabled(configuration)) {
caCertPemList = new ArrayList<>();
SCMGetCertResponseProto scmGetCertResponseProto =
scmSecurityProtocolClient.getCACert();
if (scmGetCertResponseProto.hasX509Certificate()) {
caCertPemList.add(scmGetCertResponseProto.getX509Certificate());
}
if (scmGetCertResponseProto.hasX509RootCACertificate()) {
caCertPemList.add(scmGetCertResponseProto.getX509RootCACertificate());
}
} else {
Collection<String> scmNodes = SCMHAUtils.getSCMNodeIds(configuration);
int expectedCount = scmNodes.size() + 1;
if (scmNodes.size() > 1) {
caCertPemList = waitForCACerts(
() -> scmSecurityProtocolClient.listCACertificate(),
expectedCount);
checkCertCount(caCertPemList.size(), expectedCount);
} else{
caCertPemList = scmSecurityProtocolClient.listCACertificate();
}
}
}
return caCertPemList;
}
private static List<String> waitForCACerts(
final SupplierWithIOException<List<String>> applyFunction,
int expectedCount) throws IOException {
//TODO: make wait time and sleep time configurable if needed.
// TODO: If SCMs are bootstrapped later, then listCA need to be
// refetched if listCA size is less than scm ha config node list size.
// For now when Client of SCM's are started we compare their node list
// size and ca list size if it is as expected, we return the ca list.
boolean caListUpToDate;
long waitTime = 5 * 60 * 1000L;
long retryTime = 10 * 1000L;
long currentTime = Time.monotonicNow();
List<String> caCertPemList;
do {
caCertPemList = applyFunction.get();
caListUpToDate =
caCertPemList.size() == expectedCount ? true : false;
if (!caListUpToDate) {
LOG.info("Expected CA list size {}, where as received CA List size " +
"{}. Retry to fetch CA List after {} seconds", expectedCount,
caCertPemList.size(), waitTime/1000);
try {
Thread.sleep(retryTime);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
} while (!caListUpToDate &&
Time.monotonicNow() - currentTime < waitTime);
return caCertPemList;
}
private static void checkCertCount(int certCount, int expectedCount)
throws SCMSecurityException{
if (certCount != expectedCount) {
LOG.error("Unable to obtain CA list for SCM cluster, obtained CA list " +
"size is {}, where as expected list size is {}",
certCount, expectedCount);
throw new SCMSecurityException("Unable to obtain complete CA list");
}
}
/**
* Build CA List in the format of X509Certificate.
* If certificate client is null, obtain the list of CA using SCM
* security client, else it uses certificate client.
* @param certClient
* @param conf
* @return list of CA X509Certificates.
* @throws IOException
*/
public static List<X509Certificate> buildCAX509List(
CertificateClient certClient,
ConfigurationSource conf) throws IOException {
if (certClient != null) {
// Do this here to avoid extra conversion of X509 to pem and again to
// X509 by buildCAList.
if (!SCMHAUtils.isSCMHAEnabled(conf)) {
List<X509Certificate> x509Certificates = new ArrayList<>();
if (certClient.getRootCACertificate() != null) {
x509Certificates.add(certClient.getRootCACertificate());
}
x509Certificates.add(certClient.getCACertificate());
return x509Certificates;
}
}
List<String> pemEncodedCerts = HAUtils.buildCAList(certClient, conf);
return OzoneSecurityUtil.convertToX509(pemEncodedCerts);
}
}