blob: e55b7340d467f954c80e96c9bd81b443db9406bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds;
import com.google.protobuf.ServiceException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.TreeMap;
import org.apache.hadoop.conf.ConfigRedactor;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcException;
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
import org.apache.hadoop.ipc.RpcNoSuchProtocolException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_DNS_INTERFACE_KEY;
import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_DNS_NAMESERVER_KEY;
import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_CLIENT_BIND_HOST_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_CLIENT_BIND_HOST_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_CLIENT_PORT_DEFAULT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_CLIENT_PORT_KEY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.SizeInBytes;
import org.apache.hadoop.ozone.conf.OzoneServiceConfig;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HDDS specific stateless utility functions.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public final class HddsUtils {
private static final Logger LOG = LoggerFactory.getLogger(HddsUtils.class);
/**
* The service ID of the solitary Ozone SCM service.
*/
public static final String OZONE_SCM_SERVICE_ID = "OzoneScmService";
public static final String OZONE_SCM_SERVICE_INSTANCE_ID =
"OzoneScmServiceInstance";
private static final String MULTIPLE_SCM_NOT_YET_SUPPORTED =
ScmConfigKeys.OZONE_SCM_NAMES + " must contain a single hostname."
+ " Multiple SCM hosts are currently unsupported";
public static final ByteString REDACTED =
ByteString.copyFromUtf8("<redacted>");
private static final int ONE_MB = SizeInBytes.valueOf("1m").getSizeInt();
private static final int NO_PORT = -1;
private HddsUtils() {
}
/**
* Retrieve the socket address that should be used by clients to connect
* to the SCM.
*
* @return Target {@code InetSocketAddress} for the SCM client endpoint.
*/
public static Collection<InetSocketAddress> getScmAddressForClients(
ConfigurationSource conf) {
if (SCMHAUtils.getScmServiceId(conf) != null) {
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
Collection<InetSocketAddress> scmAddressList =
new HashSet<>(scmNodeInfoList.size());
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
if (scmNodeInfo.getScmClientAddress() == null) {
throw new ConfigurationException("Ozone scm client address is not " +
"set for SCM service-id " + scmNodeInfo.getServiceId() +
"node-id" + scmNodeInfo.getNodeId());
}
scmAddressList.add(
NetUtils.createSocketAddr(scmNodeInfo.getScmClientAddress()));
}
return scmAddressList;
} else {
String address = conf.getTrimmed(OZONE_SCM_CLIENT_ADDRESS_KEY);
int port = -1;
if (address == null) {
// fall back to ozone.scm.names for non-ha
Collection<String> scmAddresses =
conf.getTrimmedStringCollection(OZONE_SCM_NAMES);
if (scmAddresses.isEmpty()) {
throw new ConfigurationException("Ozone scm client address is not " +
"set. Configure one of these config " +
OZONE_SCM_CLIENT_ADDRESS_KEY + ", " + OZONE_SCM_NAMES);
}
if (scmAddresses.size() > 1) {
throw new ConfigurationException("For non-HA SCM " + OZONE_SCM_NAMES
+ " should be set with single address");
}
address = scmAddresses.iterator().next();
port = conf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
OZONE_SCM_CLIENT_PORT_DEFAULT);
} else {
port = getHostPort(address)
.orElse(conf.getInt(OZONE_SCM_CLIENT_PORT_KEY,
OZONE_SCM_CLIENT_PORT_DEFAULT));
}
return Collections.singletonList(
NetUtils.createSocketAddr(getHostName(address).get() + ":" + port));
}
}
/**
* Retrieve the hostname, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
* @param conf - Conf
* @param keys a list of configuration key names.
*
* @return first hostname component found from the given keys, or absent.
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
public static Optional<String> getHostNameFromConfigKeys(
ConfigurationSource conf,
String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
final Optional<String> hostName = getHostName(value);
if (hostName.isPresent()) {
return hostName;
}
}
return Optional.empty();
}
/**
* Gets the hostname or Indicates that it is absent.
* @param value host or host:port
* @return hostname
*/
public static Optional<String> getHostName(String value) {
if ((value == null) || value.isEmpty()) {
return Optional.empty();
}
String hostname = value.replaceAll("\\:[0-9]+$", "");
if (hostname.length() == 0) {
return Optional.empty();
} else {
return Optional.of(hostname);
}
}
/**
* Gets the port if there is one, returns empty {@code OptionalInt} otherwise.
* @param value String in host:port format.
* @return Port
*/
public static OptionalInt getHostPort(String value) {
if ((value == null) || value.isEmpty()) {
return OptionalInt.empty();
}
int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT);
if (port == NO_PORT) {
return OptionalInt.empty();
} else {
return OptionalInt.of(port);
}
}
/**
* Retrieve a number, trying the supplied config keys in order.
* Each config value may be absent
*
* @param conf Conf
* @param keys a list of configuration key names.
*
* @return first number found from the given keys, or absent.
*/
public static OptionalInt getNumberFromConfigKeys(
ConfigurationSource conf, String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
if (value != null) {
return OptionalInt.of(Integer.parseInt(value));
}
}
return OptionalInt.empty();
}
/**
* Retrieve the port number, trying the supplied config keys in order.
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
* @param conf Conf
* @param keys a list of configuration key names.
*
* @return first port number component found from the given keys, or absent.
* @throws IllegalArgumentException if any values are not in the 'host'
* or host:port format.
*/
public static OptionalInt getPortNumberFromConfigKeys(
ConfigurationSource conf, String... keys) {
for (final String key : keys) {
final String value = conf.getTrimmed(key);
final OptionalInt hostPort = getHostPort(value);
if (hostPort.isPresent()) {
return hostPort;
}
}
return OptionalInt.empty();
}
/**
* Retrieve the socket addresses of all storage container managers.
*
* @return A collection of SCM addresses
* @throws IllegalArgumentException If the configuration is invalid
*/
public static Collection<InetSocketAddress> getSCMAddressForDatanodes(
ConfigurationSource conf) {
// First check HA style config, if not defined fall back to OZONE_SCM_NAMES
if (SCMHAUtils.getScmServiceId(conf) != null) {
List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
Collection<InetSocketAddress> scmAddressList =
new HashSet<>(scmNodeInfoList.size());
for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
scmAddressList.add(
NetUtils.createSocketAddr(scmNodeInfo.getScmDatanodeAddress()));
}
return scmAddressList;
} else {
// fall back to OZONE_SCM_NAMES.
Collection<String> names =
conf.getTrimmedStringCollection(ScmConfigKeys.OZONE_SCM_NAMES);
if (names.isEmpty()) {
throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_NAMES
+ " need to be a set of valid DNS names or IP addresses."
+ " Empty address list found.");
}
Collection<InetSocketAddress> addresses = new HashSet<>(names.size());
for (String address : names) {
Optional<String> hostname = getHostName(address);
if (!hostname.isPresent()) {
throw new IllegalArgumentException("Invalid hostname for SCM: "
+ address);
}
int port = getHostPort(address)
.orElse(conf.getInt(OZONE_SCM_DATANODE_PORT_KEY,
OZONE_SCM_DATANODE_PORT_DEFAULT));
InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(),
port);
addresses.add(addr);
}
if (addresses.size() > 1) {
LOG.warn("When SCM HA is configured, configure {} appended with " +
"serviceId and nodeId. {} is deprecated.", OZONE_SCM_ADDRESS_KEY,
OZONE_SCM_NAMES);
}
return addresses;
}
}
/**
* Retrieve the socket addresses of recon.
*
* @return Recon address
* @throws IllegalArgumentException If the configuration is invalid
*/
public static InetSocketAddress getReconAddresses(
ConfigurationSource conf) {
String name = conf.get(OZONE_RECON_ADDRESS_KEY);
if (StringUtils.isEmpty(name)) {
return null;
}
Optional<String> hostname = getHostName(name);
if (!hostname.isPresent()) {
throw new IllegalArgumentException("Invalid hostname for Recon: "
+ name);
}
int port = getHostPort(name).orElse(OZONE_RECON_DATANODE_PORT_DEFAULT);
return NetUtils.createSocketAddr(hostname.get(), port);
}
/**
* Returns the hostname for this datanode. If the hostname is not
* explicitly configured in the given config, then it is determined
* via the DNS class.
*
* @param conf Configuration
*
* @return the hostname (NB: may not be a FQDN)
* @throws UnknownHostException if the dfs.datanode.dns.interface
* option is used and the hostname can not be determined
*/
public static String getHostName(ConfigurationSource conf)
throws UnknownHostException {
String name = conf.get(DFS_DATANODE_HOST_NAME_KEY);
if (name == null) {
String dnsInterface = conf.get(
CommonConfigurationKeysPublic.HADOOP_SECURITY_DNS_INTERFACE_KEY);
String nameServer = conf.get(
CommonConfigurationKeysPublic.HADOOP_SECURITY_DNS_NAMESERVER_KEY);
boolean fallbackToHosts = false;
if (dnsInterface == null) {
// Try the legacy configuration keys.
dnsInterface = conf.get(DFS_DATANODE_DNS_INTERFACE_KEY);
dnsInterface = conf.get(DFS_DATANODE_DNS_INTERFACE_KEY);
nameServer = conf.get(DFS_DATANODE_DNS_NAMESERVER_KEY);
} else {
// If HADOOP_SECURITY_DNS_* is set then also attempt hosts file
// resolution if DNS fails. We will not use hosts file resolution
// by default to avoid breaking existing clusters.
fallbackToHosts = true;
}
name = DNS.getDefaultHost(dnsInterface, nameServer, fallbackToHosts);
}
return name;
}
/**
* Retrieve the socket address that is used by Datanode.
* @param conf
* @return Target InetSocketAddress for the Datanode service endpoint.
*/
public static InetSocketAddress
getDatanodeRpcAddress(ConfigurationSource conf) {
final String host = getHostNameFromConfigKeys(conf,
HDDS_DATANODE_CLIENT_BIND_HOST_KEY)
.orElse(HDDS_DATANODE_CLIENT_BIND_HOST_DEFAULT);
final int port = getPortNumberFromConfigKeys(conf,
HDDS_DATANODE_CLIENT_ADDRESS_KEY)
.orElse(conf.getInt(HDDS_DATANODE_CLIENT_PORT_KEY,
HDDS_DATANODE_CLIENT_PORT_DEFAULT));
return NetUtils.createSocketAddr(host + ":" + port);
}
/**
* Checks if the container command is read only or not.
* @param proto ContainerCommand Request proto
* @return True if its readOnly , false otherwise.
*/
public static boolean isReadOnly(
ContainerCommandRequestProtoOrBuilder proto) {
switch (proto.getCmdType()) {
case ReadContainer:
case ReadChunk:
case ListBlock:
case GetBlock:
case GetSmallFile:
case ListContainer:
case ListChunk:
case GetCommittedBlockLength:
return true;
case CloseContainer:
case WriteChunk:
case UpdateContainer:
case CompactChunk:
case CreateContainer:
case DeleteChunk:
case DeleteContainer:
case DeleteBlock:
case PutBlock:
case PutSmallFile:
default:
return false;
}
}
/**
* Returns true if the container is in open to write state
* (OPEN or RECOVERING).
*
* @param state - container state
*/
public static boolean isOpenToWriteState(State state) {
return state == State.OPEN || state == State.RECOVERING;
}
/**
* Not all datanode container cmd protocol has embedded ozone block token.
* Block token are issued by Ozone Manager and return to Ozone client to
* read/write data on datanode via input/output stream.
* Ozone datanode uses this helper to decide which command requires block
* token.
* @return true if it is a cmd that block token should be checked when
* security is enabled
* false if block token does not apply to the command.
*
*/
public static boolean requireBlockToken(
ContainerProtos.Type cmdType) {
switch (cmdType) {
case DeleteBlock:
case DeleteChunk:
case GetBlock:
case GetCommittedBlockLength:
case GetSmallFile:
case PutBlock:
case PutSmallFile:
case ReadChunk:
case WriteChunk:
return true;
default:
return false;
}
}
public static boolean requireContainerToken(
ContainerProtos.Type cmdType) {
switch (cmdType) {
case CloseContainer:
case CreateContainer:
case DeleteContainer:
case ReadContainer:
case UpdateContainer:
case ListBlock:
return true;
default:
return false;
}
}
/**
* Return the block ID of container commands that are related to blocks.
* @param msg container command
* @return block ID.
*/
public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) {
ContainerProtos.DatanodeBlockID blockID = null;
switch (msg.getCmdType()) {
case DeleteBlock:
if (msg.hasDeleteBlock()) {
blockID = msg.getDeleteBlock().getBlockID();
}
break;
case DeleteChunk:
if (msg.hasDeleteChunk()) {
blockID = msg.getDeleteChunk().getBlockID();
}
break;
case GetBlock:
if (msg.hasGetBlock()) {
blockID = msg.getGetBlock().getBlockID();
}
break;
case GetCommittedBlockLength:
if (msg.hasGetCommittedBlockLength()) {
blockID = msg.getGetCommittedBlockLength().getBlockID();
}
break;
case GetSmallFile:
if (msg.hasGetSmallFile()) {
blockID = msg.getGetSmallFile().getBlock().getBlockID();
}
break;
case ListChunk:
if (msg.hasListChunk()) {
blockID = msg.getListChunk().getBlockID();
}
break;
case PutBlock:
if (msg.hasPutBlock()) {
blockID = msg.getPutBlock().getBlockData().getBlockID();
}
break;
case PutSmallFile:
if (msg.hasPutSmallFile()) {
blockID = msg.getPutSmallFile().getBlock().getBlockData().getBlockID();
}
break;
case ReadChunk:
if (msg.hasReadChunk()) {
blockID = msg.getReadChunk().getBlockID();
}
break;
case WriteChunk:
if (msg.hasWriteChunk()) {
blockID = msg.getWriteChunk().getBlockID();
}
break;
default:
break;
}
return blockID != null
? BlockID.getFromProtobuf(blockID)
: null;
}
/**
* Register the provided MBean with additional JMX ObjectName properties.
* If additional properties are not supported then fallback to registering
* without properties.
*
* @param serviceName - see {@link MBeans#register}
* @param mBeanName - see {@link MBeans#register}
* @param jmxProperties - additional JMX ObjectName properties.
* @param mBean - the MBean to register.
* @return the named used to register the MBean.
*/
public static ObjectName registerWithJmxProperties(
String serviceName, String mBeanName, Map<String, String> jmxProperties,
Object mBean) {
try {
// Check support for registering with additional properties.
final Method registerMethod = MBeans.class.getMethod(
"register", String.class, String.class,
Map.class, Object.class);
return (ObjectName) registerMethod.invoke(
null, serviceName, mBeanName, jmxProperties, mBean);
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
// Fallback
if (LOG.isTraceEnabled()) {
LOG.trace("Registering MBean {} without additional properties {}",
mBeanName, jmxProperties);
}
return MBeans.register(serviceName, mBeanName, mBean);
}
}
/**
* Get the current time in milliseconds.
* @return the current time in milliseconds.
*/
public static long getTime() {
return System.currentTimeMillis();
}
/**
* Basic validation for {@code path}: checks that it is a descendant of
* (or the same as) the given {@code ancestor}.
* @param path the path to be validated
* @param ancestor a trusted path that is supposed to be the ancestor of
* {@code path}
* @throws NullPointerException if either {@code path} or {@code ancestor} is
* null
* @throws IllegalArgumentException if {@code ancestor} is not really the
* ancestor of {@code path}
*/
public static void validatePath(Path path, Path ancestor) {
Preconditions.checkNotNull(path,
"Path should not be null");
Preconditions.checkNotNull(ancestor,
"Ancestor should not be null");
Preconditions.checkArgument(
path.normalize().startsWith(ancestor.normalize()),
"Path should be a descendant of %s", ancestor);
}
public static File createDir(String dirPath) {
File dirFile = new File(dirPath);
if (!dirFile.mkdirs() && !dirFile.exists()) {
throw new IllegalArgumentException("Unable to create path: " + dirFile);
}
return dirFile;
}
/**
* Leverages the Configuration.getPassword method to attempt to get
* passwords from the CredentialProvider API before falling back to
* clear text in config - if falling back is allowed.
* @param conf Configuration instance
* @param alias name of the credential to retrieve
* @return String credential value or null
*/
static String getPassword(ConfigurationSource conf, String alias) {
String password = null;
try {
char[] passchars = conf.getPassword(alias);
if (passchars != null) {
password = new String(passchars);
}
} catch (IOException ioe) {
LOG.warn("Setting password to null since IOException is caught"
+ " when getting password", ioe);
password = null;
}
return password;
}
/**
* Utility string formatter method to display SCM roles.
*
* @param nodes
* @return
*/
public static String format(List<String> nodes) {
StringBuilder sb = new StringBuilder();
for (String node : nodes) {
String[] x = node.split(":");
sb.append(String
.format("{ HostName : %s, Ratis Port : %s, Role : %s } ", x[0], x[1],
x[2]));
}
return sb.toString();
}
/**
* Return Ozone service shutdown time out.
* @param conf
*/
public static long getShutDownTimeOut(ConfigurationSource conf) {
return conf.getObject(OzoneServiceConfig.class).getServiceShutdownTimeout();
}
/**
* Utility method to round up bytes into the nearest MB.
*/
public static int roundupMb(long bytes) {
return (int)Math.ceil((double) bytes / (double) ONE_MB);
}
/**
* Unwrap exception to check if it is some kind of access control problem
* ({@link AccessControlException} or {@link SecretManager.InvalidToken})
* or a RpcException.
*/
public static Throwable getUnwrappedException(Exception ex) {
if (ex instanceof ServiceException) {
Throwable t = ex.getCause();
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
while (t != null) {
if (t instanceof RpcException ||
t instanceof AccessControlException ||
t instanceof SecretManager.InvalidToken) {
return t;
}
t = t.getCause();
}
}
return null;
}
/**
* For some Rpc Exceptions, client should not failover.
*/
public static boolean shouldNotFailoverOnRpcException(Throwable exception) {
if (exception instanceof RpcException) {
// Should not failover for following exceptions
if (exception instanceof RpcNoSuchMethodException ||
exception instanceof RpcNoSuchProtocolException ||
exception instanceof RPC.VersionMismatch) {
return true;
}
if (exception.getMessage().contains(
"RPC response exceeds maximum data length") ||
exception.getMessage().contains("RPC response has invalid length")) {
return true;
}
}
return false;
}
/**
* Remove binary data from request {@code msg}. (May be incomplete, feel
* free to add any missing cleanups.)
*/
public static ContainerProtos.ContainerCommandRequestProto processForDebug(
ContainerProtos.ContainerCommandRequestProto msg) {
if (msg == null) {
return null;
}
if (msg.hasWriteChunk() || msg.hasPutSmallFile()) {
ContainerProtos.ContainerCommandRequestProto.Builder builder =
msg.toBuilder();
if (msg.hasWriteChunk()) {
builder.getWriteChunkBuilder().setData(REDACTED);
}
if (msg.hasPutSmallFile()) {
builder.getPutSmallFileBuilder().setData(REDACTED);
}
return builder.build();
}
return msg;
}
/**
* Remove binary data from response {@code msg}. (May be incomplete, feel
* free to add any missing cleanups.)
*/
public static ContainerProtos.ContainerCommandResponseProto processForDebug(
ContainerProtos.ContainerCommandResponseProto msg) {
if (msg == null) {
return null;
}
if (msg.hasReadChunk() || msg.hasGetSmallFile()) {
ContainerProtos.ContainerCommandResponseProto.Builder builder =
msg.toBuilder();
if (msg.hasReadChunk()) {
if (msg.getReadChunk().hasData()) {
builder.getReadChunkBuilder().setData(REDACTED);
}
if (msg.getReadChunk().hasDataBuffers()) {
builder.getReadChunkBuilder().getDataBuffersBuilder()
.clearBuffers()
.addBuffers(REDACTED);
}
}
if (msg.hasGetSmallFile()) {
if (msg.getGetSmallFile().getData().hasData()) {
builder.getGetSmallFileBuilder().getDataBuilder().setData(REDACTED);
}
if (msg.getGetSmallFile().getData().hasDataBuffers()) {
builder.getGetSmallFileBuilder().getDataBuilder()
.getDataBuffersBuilder()
.clearBuffers()
.addBuffers(REDACTED);
}
}
return builder.build();
}
return msg;
}
/**
* Redacts sensitive configuration.
* Sorts all properties by key name
*
* @param conf OzoneConfiguration object to be printed.
* @return Sorted Map of properties
*/
public static Map<String, String> processForLogging(OzoneConfiguration conf) {
Map<String, String> ozoneProps = conf.getOzoneProperties();
ConfigRedactor redactor = new ConfigRedactor(conf);
Map<String, String> sortedOzoneProps = new TreeMap<>();
for (Map.Entry<String, String> entry : ozoneProps.entrySet()) {
String value = redactor.redact(entry.getKey(), entry.getValue());
if (value != null) {
value = value.trim();
}
sortedOzoneProps.put(entry.getKey(), value);
}
return sortedOzoneProps;
}
/**
* Execute some code and ensure thread name is not changed
* (workaround for HADOOP-18433).
*/
public static <T, E extends IOException> T preserveThreadName(
CheckedSupplier<T, E> supplier) throws E {
final Thread thread = Thread.currentThread();
final String threadName = thread.getName();
try {
return supplier.get();
} finally {
if (!Objects.equals(threadName, thread.getName())) {
LOG.info("Restoring thread name: {}", threadName);
thread.setName(threadName);
}
}
}
/** Concatenate stack trace {@code elements} (one per line) starting at
* {@code startIndex}. */
public static @Nonnull String formatStackTrace(
@Nullable StackTraceElement[] elements, int startIndex) {
if (elements != null && elements.length > startIndex) {
final StringBuilder sb = new StringBuilder();
for (int line = startIndex; line < elements.length; line++) {
sb.append(elements[line]).append("\n");
}
return sb.toString();
}
return "";
}
/** @return current thread stack trace if {@code logger} has debug enabled */
public static @Nullable StackTraceElement[] getStackTrace(
@Nonnull Logger logger) {
return logger.isDebugEnabled()
? Thread.currentThread().getStackTrace()
: null;
}
}