blob: 8e129c9d2301157987f05707e1b4a440cb8d6c3b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone;
import com.google.common.base.Joiner;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_BIND_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stateless helper functions for the server and client side of OM
* communication.
*/
public final class OmUtils {
public static final Logger LOG = LoggerFactory.getLogger(OmUtils.class);
private static final SecureRandom SRAND = new SecureRandom();
private static byte[] randomBytes = new byte[32];
private OmUtils() {
}
/**
* Retrieve the socket address that is used by OM.
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static InetSocketAddress getOmAddress(Configuration conf) {
return NetUtils.createSocketAddr(getOmRpcAddress(conf));
}
/**
* Retrieve the socket address that is used by OM.
* @param conf
* @return Target InetSocketAddress for the SCM service endpoint.
*/
public static String getOmRpcAddress(Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_OM_ADDRESS_KEY);
return host.orElse(OZONE_OM_BIND_HOST_DEFAULT) + ":" +
getOmRpcPort(conf);
}
/**
* Retrieve the socket address that is used by OM as specified by the confKey.
* Return null if the specified conf key is not set.
* @param conf configuration
* @param confKey configuration key to lookup address from
* @return Target InetSocketAddress for the OM RPC server.
*/
public static String getOmRpcAddress(Configuration conf, String confKey) {
final Optional<String> host = getHostNameFromConfigKeys(conf, confKey);
if (host.isPresent()) {
return host.get() + ":" + getOmRpcPort(conf, confKey);
} else {
// The specified confKey is not set
return null;
}
}
/**
* Retrieve the socket address that should be used by clients to connect
* to OM.
* @param conf
* @return Target InetSocketAddress for the OM service endpoint.
*/
public static InetSocketAddress getOmAddressForClients(
Configuration conf) {
final Optional<String> host = getHostNameFromConfigKeys(conf,
OZONE_OM_ADDRESS_KEY);
if (!host.isPresent()) {
throw new IllegalArgumentException(
OZONE_OM_ADDRESS_KEY + " must be defined. See" +
" https://wiki.apache.org/hadoop/Ozone#Configuration for" +
" details on configuring Ozone.");
}
return NetUtils.createSocketAddr(
host.get() + ":" + getOmRpcPort(conf));
}
/**
* Returns true if OZONE_OM_SERVICE_IDS_KEY is defined and not empty.
* @param conf Configuration
* @return true if OZONE_OM_SERVICE_IDS_KEY is defined and not empty;
* else false.
*/
public static boolean isServiceIdsDefined(Configuration conf) {
String val = conf.get(OZONE_OM_SERVICE_IDS_KEY);
return val != null && val.length() > 0;
}
/**
* Returns true if HA for OzoneManager is configured for the given service id.
* @param conf Configuration
* @param serviceId OM HA cluster service ID
* @return true if HA is configured in the configuration; else false.
*/
public static boolean isOmHAServiceId(Configuration conf, String serviceId) {
Collection<String> omServiceIds = conf.getTrimmedStringCollection(
OZONE_OM_SERVICE_IDS_KEY);
return omServiceIds.contains(serviceId);
}
public static int getOmRpcPort(Configuration conf) {
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
OZONE_OM_ADDRESS_KEY);
return port.orElse(OZONE_OM_PORT_DEFAULT);
}
/**
* Retrieve the port that is used by OM as specified by the confKey.
* Return default port if port is not specified in the confKey.
* @param conf configuration
* @param confKey configuration key to lookup address from
* @return Port on which OM RPC server will listen on
*/
public static int getOmRpcPort(Configuration conf, String confKey) {
// If no port number is specified then we'll just try the defaultBindPort.
final Optional<Integer> port = getPortNumberFromConfigKeys(conf, confKey);
return port.orElse(OZONE_OM_PORT_DEFAULT);
}
public static int getOmRestPort(Configuration conf) {
// If no port number is specified then we'll just try the default
// HTTP BindPort.
final Optional<Integer> port =
getPortNumberFromConfigKeys(conf, OZONE_OM_HTTP_ADDRESS_KEY);
return port.orElse(OZONE_OM_HTTP_BIND_PORT_DEFAULT);
}
/**
* Get the location where OM should store its metadata directories.
* Fall back to OZONE_METADATA_DIRS if not defined.
*
* @param conf - Config
* @return File path, after creating all the required Directories.
*/
public static File getOmDbDir(Configuration conf) {
return ServerUtils.getDBPath(conf, OMConfigKeys.OZONE_OM_DB_DIRS);
}
/**
* Checks if the OM request is read only or not.
* @param omRequest OMRequest proto
* @return True if its readOnly, false otherwise.
*/
public static boolean isReadOnly(
OzoneManagerProtocolProtos.OMRequest omRequest) {
OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
switch (cmdType) {
case CheckVolumeAccess:
case InfoVolume:
case ListVolume:
case InfoBucket:
case ListBuckets:
case LookupKey:
case ListKeys:
case InfoS3Bucket:
case ListS3Buckets:
case ServiceList:
case ListMultiPartUploadParts:
case GetFileStatus:
case LookupFile:
case ListStatus:
case GetAcl:
case DBUpdates:
case ListMultipartUploads:
return true;
case CreateVolume:
case SetVolumeProperty:
case DeleteVolume:
case CreateBucket:
case SetBucketProperty:
case DeleteBucket:
case CreateKey:
case RenameKey:
case DeleteKey:
case CommitKey:
case AllocateBlock:
case CreateS3Bucket:
case DeleteS3Bucket:
case InitiateMultiPartUpload:
case CommitMultiPartUpload:
case CompleteMultiPartUpload:
case AbortMultiPartUpload:
case GetS3Secret:
case GetDelegationToken:
case RenewDelegationToken:
case CancelDelegationToken:
case CreateDirectory:
case CreateFile:
case RemoveAcl:
case SetAcl:
case AddAcl:
case PurgeKeys:
return false;
default:
LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
return false;
}
}
public static byte[] getMD5Digest(String input) throws IOException {
try {
MessageDigest md = MessageDigest.getInstance(OzoneConsts.MD5_HASH);
return md.digest(input.getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException ex) {
throw new IOException("Error creating an instance of MD5 digest.\n" +
"This could possibly indicate a faulty JRE");
}
}
public static byte[] getSHADigest() throws IOException {
try {
SRAND.nextBytes(randomBytes);
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
return sha.digest(randomBytes);
} catch (NoSuchAlgorithmException ex) {
throw new IOException("Error creating an instance of SHA-256 digest.\n" +
"This could possibly indicate a faulty JRE");
}
}
/**
* Add non empty and non null suffix to a key.
*/
private static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return key;
}
assert !suffix.startsWith(".") :
"suffix '" + suffix + "' should not already have '.' prepended.";
return key + "." + suffix;
}
/**
* Concatenate list of suffix strings '.' separated.
*/
private static String concatSuffixes(String... suffixes) {
if (suffixes == null) {
return null;
}
return Joiner.on(".").skipNulls().join(suffixes);
}
/**
* Return configuration key of format key.suffix1.suffix2...suffixN.
*/
public static String addKeySuffixes(String key, String... suffixes) {
String keySuffix = concatSuffixes(suffixes);
return addSuffix(key, keySuffix);
}
/**
* Match input address to local address.
* Return true if it matches, false otherwsie.
*/
public static boolean isAddressLocal(InetSocketAddress addr) {
return NetUtils.isLocalAddress(addr.getAddress());
}
/**
* Get a collection of all omNodeIds for the given omServiceId.
*/
public static Collection<String> getOMNodeIds(Configuration conf,
String omServiceId) {
String key = addSuffix(OZONE_OM_NODES_KEY, omServiceId);
return conf.getTrimmedStringCollection(key);
}
/**
* @return <code>coll</code> if it is non-null and non-empty. Otherwise,
* returns a list with a single null value.
*/
public static Collection<String> emptyAsSingletonNull(Collection<String>
coll) {
if (coll == null || coll.isEmpty()) {
return Collections.singletonList(null);
} else {
return coll;
}
}
/**
* Write OM DB Checkpoint to an output stream as a compressed file (tgz).
* @param checkpoint checkpoint file
* @param destination desination output stream.
* @throws IOException
*/
public static void writeOmDBCheckpointToStream(DBCheckpoint checkpoint,
OutputStream destination)
throws IOException {
try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
.createCompressorOutputStream(CompressorStreamFactory.GZIP,
destination)) {
try (ArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(gzippedOut)) {
Path checkpointPath = checkpoint.getCheckpointLocation();
for (Path path : Files.list(checkpointPath)
.collect(Collectors.toList())) {
if (path != null) {
Path fileName = path.getFileName();
if (fileName != null) {
includeFile(path.toFile(), fileName.toString(),
archiveOutputStream);
}
}
}
}
} catch (CompressorException e) {
throw new IOException(
"Can't compress the checkpoint: " +
checkpoint.getCheckpointLocation(), e);
}
}
private static void includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream)
throws IOException {
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
try (FileInputStream fis = new FileInputStream(file)) {
IOUtils.copy(fis, archiveOutputStream);
}
archiveOutputStream.closeArchiveEntry();
}
/**
* If a OM conf is only set with key suffixed with OM Node ID, return the
* set value.
* @return if the value is set for key suffixed with OM Node ID, return the
* value, else return null.
*/
public static String getConfSuffixedWithOMNodeId(Configuration conf,
String confKey, String omServiceID, String omNodeId) {
String suffixedConfKey = OmUtils.addKeySuffixes(
confKey, omServiceID, omNodeId);
String confValue = conf.getTrimmed(suffixedConfKey);
if (StringUtils.isNotEmpty(confValue)) {
return confValue;
}
return null;
}
/**
* Returns the http address of peer OM node.
* @param conf Configuration
* @param omNodeId peer OM node ID
* @param omNodeHostAddr peer OM node host address
* @return http address of peer OM node in the format <hostName>:<port>
*/
public static String getHttpAddressForOMPeerNode(Configuration conf,
String omServiceId, String omNodeId, String omNodeHostAddr) {
final Optional<String> bindHost = getHostNameFromConfigKeys(conf,
addKeySuffixes(OZONE_OM_HTTP_BIND_HOST_KEY, omServiceId, omNodeId));
final Optional<Integer> addressPort = getPortNumberFromConfigKeys(conf,
addKeySuffixes(OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId));
final Optional<String> addressHost = getHostNameFromConfigKeys(conf,
addKeySuffixes(OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId));
String hostName = bindHost.orElse(addressHost.orElse(omNodeHostAddr));
return hostName + ":" + addressPort.orElse(OZONE_OM_HTTP_BIND_PORT_DEFAULT);
}
/**
* Returns the https address of peer OM node.
* @param conf Configuration
* @param omNodeId peer OM node ID
* @param omNodeHostAddr peer OM node host address
* @return https address of peer OM node in the format <hostName>:<port>
*/
public static String getHttpsAddressForOMPeerNode(Configuration conf,
String omServiceId, String omNodeId, String omNodeHostAddr) {
final Optional<String> bindHost = getHostNameFromConfigKeys(conf,
addKeySuffixes(OZONE_OM_HTTPS_BIND_HOST_KEY, omServiceId, omNodeId));
final Optional<Integer> addressPort = getPortNumberFromConfigKeys(conf,
addKeySuffixes(OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId));
final Optional<String> addressHost = getHostNameFromConfigKeys(conf,
addKeySuffixes(OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId));
String hostName = bindHost.orElse(addressHost.orElse(omNodeHostAddr));
return hostName + ":" +
addressPort.orElse(OZONE_OM_HTTPS_BIND_PORT_DEFAULT);
}
/**
* Get the local directory where ratis logs will be stored.
*/
public static String getOMRatisDirectory(Configuration conf) {
String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
if (Strings.isNullOrEmpty(storageDir)) {
storageDir = HddsServerUtil.getDefaultRatisDirectory(conf);
}
return storageDir;
}
public static String getOMRatisSnapshotDirectory(Configuration conf) {
String snapshotDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_DIR);
if (Strings.isNullOrEmpty(snapshotDir)) {
snapshotDir = Paths.get(getOMRatisDirectory(conf),
"snapshot").toString();
}
return snapshotDir;
}
public static File createOMDir(String dirPath) {
File dirFile = new File(dirPath);
if (!dirFile.exists() && !dirFile.mkdirs()) {
throw new IllegalArgumentException("Unable to create path: " + dirFile);
}
return dirFile;
}
/**
* Prepares key info to be moved to deletedTable.
* 1. It strips GDPR metadata from key info
* 2. For given object key, if the repeatedOmKeyInfo instance is null, it
* implies that no entry for the object key exists in deletedTable so we
* create a new instance to include this key, else we update the existing
* repeatedOmKeyInfo instance.
* @param keyInfo args supplied by client
* @param repeatedOmKeyInfo key details from deletedTable
* @return {@link RepeatedOmKeyInfo}
* @throws IOException if I/O Errors when checking for key
*/
public static RepeatedOmKeyInfo prepareKeyForDelete(OmKeyInfo keyInfo,
RepeatedOmKeyInfo repeatedOmKeyInfo) throws IOException{
// If this key is in a GDPR enforced bucket, then before moving
// KeyInfo to deletedTable, remove the GDPR related metadata from
// KeyInfo.
if(Boolean.valueOf(keyInfo.getMetadata().get(OzoneConsts.GDPR_FLAG))) {
keyInfo.getMetadata().remove(OzoneConsts.GDPR_FLAG);
keyInfo.getMetadata().remove(OzoneConsts.GDPR_ALGORITHM);
keyInfo.getMetadata().remove(OzoneConsts.GDPR_SECRET);
}
if(repeatedOmKeyInfo == null) {
//The key doesn't exist in deletedTable, so create a new instance.
repeatedOmKeyInfo = new RepeatedOmKeyInfo(keyInfo);
} else {
//The key exists in deletedTable, so update existing instance.
repeatedOmKeyInfo.addOmKeyInfo(keyInfo);
}
return repeatedOmKeyInfo;
}
}