blob: c1997d6c899b5def8128d8cc3fd69480e38f0f0a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.*;
import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs;
/**
* Hdds stateless helper functions for server side components.
*/
public final class HddsServerUtil {
private HddsServerUtil() {
}
private static final Logger LOG = LoggerFactory.getLogger(
HddsServerUtil.class);
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getScmAddressForDataNodes(
Configuration conf) {
// We try the following settings in decreasing priority to retrieve the
// target host.
// - OZONE_SCM_DATANODE_ADDRESS_KEY
// - OZONE_SCM_CLIENT_ADDRESS_KEY
// - OZONE_SCM_NAMES
//
Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
if (!host.isPresent()) {
// Fallback to Ozone SCM names.
Collection<InetSocketAddress> scmAddresses = getSCMAddresses(conf);
if (scmAddresses.size() > 1) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_NAMES +
" must contain a single hostname. Multiple SCM hosts are " +
"currently unsupported");
}
host = Optional.of(scmAddresses.iterator().next().getHostName());
}
if (!host.isPresent()) {
throw new IllegalArgumentException(
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
" must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration "
+ "for details on configuring Ozone.");
}
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" +
port.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
return addr;
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM client endpoint.
*/
public static InetSocketAddress getScmClientBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.orElse(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
port.orElse(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM Block service.
*
* @param conf
* @return Target InetSocketAddress for the SCM block client endpoint.
*/
public static InetSocketAddress getScmBlockClientBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT)
+ ":"
+ port.orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
}
/**
* Retrieve the socket address that should be used by scm security server to
* service clients.
*
* @param conf
* @return Target InetSocketAddress for the SCM security service.
*/
public static InetSocketAddress getScmSecurityInetAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY);
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.orElse(ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT)
+ ":" + port
.orElse(conf.getInt(ScmConfigKeys
.OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_DEFAULT)));
}
/**
* Retrieve the socket address that should be used by DataNodes to connect
* to the SCM.
*
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getScmDataNodeBindAddress(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY);
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
return NetUtils.createSocketAddr(
host.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
port.orElse(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
}
/**
* Returns the interval in which the heartbeat processor thread runs.
*
* @param conf - Configuration
* @return long in Milliseconds.
*/
public static long getScmheartbeatCheckerInterval(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
}
/**
* Heartbeat Interval - Defines the heartbeat frequency from a datanode to
* SCM.
*
* @param conf - Ozone Config
* @return - HB interval in milli seconds.
*/
public static long getScmHeartbeatInterval(Configuration conf) {
return conf.getTimeDuration(HDDS_HEARTBEAT_INTERVAL,
HDDS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Get the Stale Node interval, which is used by SCM to flag a datanode as
* stale, if the heartbeat from that node has been missing for this duration.
*
* @param conf - Configuration.
* @return - Long, Milliseconds to wait before flagging a node as stale.
*/
public static long getStaleNodeInterval(Configuration conf) {
long staleNodeIntervalMs =
conf.getTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
OZONE_SCM_STALENODE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
long heartbeatIntervalMs = getScmHeartbeatInterval(conf);
// Make sure that StaleNodeInterval is configured way above the frequency
// at which we run the heartbeat thread.
//
// Here we check that staleNodeInterval is at least five times more than the
// frequency at which the accounting thread is going to run.
staleNodeIntervalMs = sanitizeUserArgs(OZONE_SCM_STALENODE_INTERVAL,
staleNodeIntervalMs, OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
heartbeatThreadFrequencyMs, 5, 1000);
// Make sure that stale node value is greater than configured value that
// datanodes are going to send HBs.
staleNodeIntervalMs = sanitizeUserArgs(OZONE_SCM_STALENODE_INTERVAL,
staleNodeIntervalMs, HDDS_HEARTBEAT_INTERVAL, heartbeatIntervalMs, 3,
1000);
return staleNodeIntervalMs;
}
/**
* Gets the interval for dead node flagging. This has to be a value that is
* greater than stale node value, and by transitive relation we also know
* that this value is greater than heartbeat interval and heartbeatProcess
* Interval.
*
* @param conf - Configuration.
* @return - the interval for dead node flagging.
*/
public static long getDeadNodeInterval(Configuration conf) {
long staleNodeIntervalMs = getStaleNodeInterval(conf);
long deadNodeIntervalMs = conf.getTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
OZONE_SCM_DEADNODE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
// Make sure that dead nodes Ms is at least twice the time for staleNodes
// with a max of 1000 times the staleNodes.
return sanitizeUserArgs(OZONE_SCM_DEADNODE_INTERVAL, deadNodeIntervalMs,
OZONE_SCM_STALENODE_INTERVAL, staleNodeIntervalMs, 2, 1000);
}
/**
* Timeout value for the RPC from Datanode to SCM, primarily used for
* Heartbeats and container reports.
*
* @param conf - Ozone Config
* @return - Rpc timeout in Milliseconds.
*/
public static long getScmRpcTimeOutInMilliseconds(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Log Warn interval.
*
* @param conf - Ozone Config
* @return - Log warn interval.
*/
public static int getLogWarnInterval(Configuration conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
}
/**
* returns the Container port.
* @param conf - Conf
* @return port number.
*/
public static int getContainerPort(Configuration conf) {
return conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
}
/**
* Return the list of service addresses for the Ozone SCM. This method is used
* by the DataNodes to determine the service instances to connect to.
*
* @param conf
* @return list of SCM service addresses.
*/
public static Map<String, ? extends Map<String, InetSocketAddress>>
getScmServiceRpcAddresses(Configuration conf) {
final Map<String, InetSocketAddress> serviceInstances = new HashMap<>();
serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID,
getScmAddressForDataNodes(conf));
final Map<String, Map<String, InetSocketAddress>> services =
new HashMap<>();
services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
return services;
}
public static String getOzoneDatanodeRatisDirectory(Configuration conf) {
String storageDir = conf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
if (Strings.isNullOrEmpty(storageDir)) {
storageDir = getDefaultRatisDirectory(conf);
}
return storageDir;
}
public static String getDefaultRatisDirectory(Configuration conf) {
LOG.warn("Storage directory for Ratis is not configured. It is a good " +
"idea to map this to an SSD disk. Falling back to {}",
HddsConfigKeys.OZONE_METADATA_DIRS);
File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
return (new File(metaDirPath, "ratis")).getPath();
}
/**
* Get the path for datanode id file.
*
* @param conf - Configuration
* @return the path of datanode id as string
*/
public static String getDatanodeIdFilePath(Configuration conf) {
String dataNodeIDDirPath =
conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR);
if (dataNodeIDDirPath == null) {
File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
if (metaDirPath == null) {
// this means meta data is not found, in theory should not happen at
// this point because should've failed earlier.
throw new IllegalArgumentException("Unable to locate meta data" +
"directory when getting datanode id path");
}
dataNodeIDDirPath = metaDirPath.toString();
}
// Use default datanode id file name for file path
return new File(dataNodeIDDirPath,
OzoneConsts.OZONE_SCM_DATANODE_ID_FILE_DEFAULT).toString();
}
}