blob: 8539910067a6d40c6bfdacf85a906d316d5b1da8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.protobuf.BlockingService;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getSingleSCMAddress;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_RETRY_COUNT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_RETRY_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_RETRY_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_RPC_RETRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hdds stateless helper functions for server side components.
*/
public final class HddsServerUtil {
private HddsServerUtil() {
}
private static final Logger LOG = LoggerFactory.getLogger(
HddsServerUtil.class);
/**
* Add protobuf-based protocol to the {@link RPC.Server}.
* @param conf configuration
* @param protocol Protocol interface
* @param service service that implements the protocol
* @param server RPC server to which the protocol & implementation is added to
*/
public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM service endpoint.
*/
public static InetSocketAddress getScmAddressForDataNodes(
ConfigurationSource conf) {
// We try the following settings in decreasing priority to retrieve the
// target host.
// - OZONE_SCM_DATANODE_ADDRESS_KEY
// - OZONE_SCM_CLIENT_ADDRESS_KEY
// - OZONE_SCM_NAMES
//
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT);
return NetUtils.createSocketAddr(host.get() + ":" + port);
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM client endpoint.
*/
public static InetSocketAddress getScmClientBindAddress(
ConfigurationSource conf) {
final String host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT);
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY)
.orElse(conf.getInt(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
return NetUtils.createSocketAddr(host + ":" + port);
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM Block service.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM block client endpoint.
*/
public static InetSocketAddress getScmBlockClientBindAddress(
ConfigurationSource conf) {
final String host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT);
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY)
.orElse(conf.getInt(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_KEY,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
return NetUtils.createSocketAddr(host + ":" + port);
}
/**
* Retrieve the socket address that should be used by scm security server to
* service clients.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM security service.
*/
public static InetSocketAddress getScmSecurityInetAddress(
ConfigurationSource conf) {
final String host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT);
final OptionalInt port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host
+ ":" + port
.orElse(conf.getInt(ScmConfigKeys
.OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT)));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM service endpoint.
*/
public static InetSocketAddress getScmDataNodeBindAddress(
ConfigurationSource conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY);
final OptionalInt port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
port.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to Recon.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM service endpoint.
*/
public static InetSocketAddress getReconDataNodeBindAddress(
ConfigurationSource conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ReconConfigKeys.OZONE_RECON_DATANODE_BIND_HOST_KEY);
final OptionalInt port = getPortNumberFromConfigKeys(conf,
ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.orElse(
ReconConfigKeys.OZONE_RECON_DATANODE_BIND_HOST_DEFAULT) + ":" +
port.orElse(ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT));
}
/**
* Returns the interval in which the heartbeat processor thread runs.
*
* @param conf - Configuration
* @return long in Milliseconds.
*/
public static long getScmheartbeatCheckerInterval(ConfigurationSource conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
}
/**
* Heartbeat Interval - Defines the heartbeat frequency from a datanode to
* SCM.
*
* @param conf - Ozone Config
* @return - HB interval in milli seconds.
*/
public static long getScmHeartbeatInterval(ConfigurationSource conf) {
return conf.getTimeDuration(HDDS_HEARTBEAT_INTERVAL,
HDDS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Get the Stale Node interval, which is used by SCM to flag a datanode as
* stale, if the heartbeat from that node has been missing for this duration.
*
* @param conf - Configuration.
* @return - Long, Milliseconds to wait before flagging a node as stale.
*/
public static long getStaleNodeInterval(ConfigurationSource conf) {
long staleNodeIntervalMs =
conf.getTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
OZONE_SCM_STALENODE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
long heartbeatIntervalMs = getScmHeartbeatInterval(conf);
// Make sure that StaleNodeInterval is configured way above the frequency
// at which we run the heartbeat thread.
//
// Here we check that staleNodeInterval is at least five times more than the
// frequency at which the accounting thread is going to run.
staleNodeIntervalMs = sanitizeUserArgs(OZONE_SCM_STALENODE_INTERVAL,
staleNodeIntervalMs, OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
heartbeatThreadFrequencyMs, 5, 1000);
// Make sure that stale node value is greater than configured value that
// datanodes are going to send HBs.
staleNodeIntervalMs = sanitizeUserArgs(OZONE_SCM_STALENODE_INTERVAL,
staleNodeIntervalMs, HDDS_HEARTBEAT_INTERVAL, heartbeatIntervalMs, 3,
1000);
return staleNodeIntervalMs;
}
/**
* Gets the interval for dead node flagging. This has to be a value that is
* greater than stale node value, and by transitive relation we also know
* that this value is greater than heartbeat interval and heartbeatProcess
* Interval.
*
* @param conf - Configuration.
* @return - the interval for dead node flagging.
*/
public static long getDeadNodeInterval(ConfigurationSource conf) {
long staleNodeIntervalMs = getStaleNodeInterval(conf);
long deadNodeIntervalMs = conf.getTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
OZONE_SCM_DEADNODE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
// Make sure that dead nodes Ms is at least twice the time for staleNodes
// with a max of 1000 times the staleNodes.
return sanitizeUserArgs(OZONE_SCM_DEADNODE_INTERVAL, deadNodeIntervalMs,
OZONE_SCM_STALENODE_INTERVAL, staleNodeIntervalMs, 2, 1000);
}
/**
* Timeout value for the RPC from Datanode to SCM, primarily used for
* Heartbeats and container reports.
*
* @param conf - Ozone Config
* @return - Rpc timeout in Milliseconds.
*/
public static long getScmRpcTimeOutInMilliseconds(ConfigurationSource conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Max retry count of rpcProxy for EndpointStateMachine of SCM.
*
* @param conf - Ozone Config
* @return - Max retry count.
*/
public static int getScmRpcRetryCount(ConfigurationSource conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_RPC_RETRY_COUNT,
OZONE_SCM_HEARTBEAT_RPC_RETRY_COUNT_DEFAULT);
}
/**
* Fixed datanode rpc retry interval, which is used by datanode to connect
* the SCM.
*
* @param conf - Ozone Config
* @return - Rpc retry interval.
*/
public static long getScmRpcRetryInterval(ConfigurationSource conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_RETRY_INTERVAL,
OZONE_SCM_HEARTBEAT_RPC_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Log Warn interval.
*
* @param conf - Ozone Config
* @return - Log warn interval.
*/
public static int getLogWarnInterval(ConfigurationSource conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
}
/**
* returns the Container port.
* @param conf - Conf
* @return port number.
*/
public static int getContainerPort(ConfigurationSource conf) {
return conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
public static Collection<String> getOzoneDatanodeRatisDirectory(
ConfigurationSource conf) {
Collection<String> rawLocations = conf.getTrimmedStringCollection(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
if (rawLocations.isEmpty()) {
rawLocations = new ArrayList<>(1);
rawLocations.add(ServerUtils.getDefaultRatisDirectory(conf));
}
return rawLocations;
}
/**
* Get the path for datanode id file.
*
* @param conf - Configuration
* @return the path of datanode id as string
*/
public static String getDatanodeIdFilePath(ConfigurationSource conf) {
String dataNodeIDDirPath =
conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR);
if (dataNodeIDDirPath == null) {
File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
if (metaDirPath == null) {
// this means meta data is not found, in theory should not happen at
// this point because should've failed earlier.
throw new IllegalArgumentException("Unable to locate meta data" +
"directory when getting datanode id path");
}
dataNodeIDDirPath = metaDirPath.toString();
}
// Use default datanode id file name for file path
return new File(dataNodeIDDirPath,
OzoneConsts.OZONE_SCM_DATANODE_ID_FILE_DEFAULT).toString();
}
/**
* Create a scm security client.
* @param conf - Ozone configuration.
*
* @return {@link SCMSecurityProtocol}
* @throws IOException
*/
public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
ConfigurationSource conf) throws IOException {
return new SCMSecurityProtocolClientSideTranslatorPB(
new SCMSecurityProtocolFailoverProxyProvider(conf,
UserGroupInformation.getCurrentUser()));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM for
* {@link org.apache.hadoop.hdds.protocol.SCMSecurityProtocol}. If
* {@link ScmConfigKeys#OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY} is not defined
* then {@link ScmConfigKeys#OZONE_SCM_CLIENT_ADDRESS_KEY} is used. If neither
* is defined then {@link ScmConfigKeys#OZONE_SCM_NAMES} is used.
*
* @param conf
* @return Target {@code InetSocketAddress} for the SCM block client endpoint.
* @throws IllegalArgumentException if configuration is not defined or invalid
*/
public static InetSocketAddress getScmAddressForSecurityProtocol(
ConfigurationSource conf) {
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
// Fallback to Ozone SCM name
host = Optional.of(getSingleSCMAddress(conf).getHostName());
}
final int port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY)
.orElse(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT);
return NetUtils.createSocketAddr(host.get() + ":" + port);
}
/**
* Create a scm block client, used by putKey() and getKey().
*
* @return {@link ScmBlockLocationProtocol}
* @throws IOException
*/
public static SCMSecurityProtocol getScmSecurityClient(
OzoneConfiguration conf, UserGroupInformation ugi) throws IOException {
SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
new SCMSecurityProtocolClientSideTranslatorPB(
new SCMSecurityProtocolFailoverProxyProvider(conf, ugi));
return TracingUtil.createProxy(scmSecurityClient,
SCMSecurityProtocol.class, conf);
}
/**
* Initialize hadoop metrics system for Ozone servers.
* @param configuration OzoneConfiguration to use.
* @param serverName The logical name of the server components.
*/
public static MetricsSystem initializeMetrics(
OzoneConfiguration configuration, String serverName) {
MetricsSystem metricsSystem = DefaultMetricsSystem.initialize(serverName);
try {
JvmMetrics.create(serverName,
configuration.get(DFSConfigKeysLegacy.DFS_METRICS_SESSION_ID_KEY),
DefaultMetricsSystem.instance());
} catch (MetricsException e) {
LOG.info("Metrics source JvmMetrics already added to DataNode.");
}
return metricsSystem;
}
/**
* Write DB Checkpoint to an output stream as a compressed file (tgz).
*
* @param checkpoint checkpoint file
* @param destination destination output stream.
* @throws IOException
*/
public static void writeDBCheckpointToStream(DBCheckpoint checkpoint,
OutputStream destination)
throws IOException {
try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
.createCompressorOutputStream(CompressorStreamFactory.GZIP,
destination);
ArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(gzippedOut);
Stream<Path> files =
Files.list(checkpoint.getCheckpointLocation())) {
for (Path path : files.collect(Collectors.toList())) {
if (path != null) {
Path fileName = path.getFileName();
if (fileName != null) {
includeFile(path.toFile(), fileName.toString(),
archiveOutputStream);
}
}
}
} catch (CompressorException e) {
throw new IOException(
"Can't compress the checkpoint: " +
checkpoint.getCheckpointLocation(), e);
}
}
private static void includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream)
throws IOException {
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
try (FileInputStream fis = new FileInputStream(file)) {
IOUtils.copy(fis, archiveOutputStream);
}
archiveOutputStream.closeArchiveEntry();
}
/**
* Converts RocksDB exception to IOE.
* @param msg - Message to add to exception.
* @param e - Original Exception.
* @return IOE.
*/
public static IOException toIOException(String msg, RocksDBException e) {
String statusCode = e.getStatus() == null ? "N/A" :
e.getStatus().getCodeString();
String errMessage = e.getMessage() == null ? "Unknown error" :
e.getMessage();
String output = msg + "; status : " + statusCode
+ "; message : " + errMessage;
return new IOException(output, e);
}
}