blob: d5fdba84633ac697031f2478e493bb4729854370 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* This manages erasure coding policies predefined and activated in the system.
* It loads customized policies and syncs with persisted ones in
* NameNode image.
*
* This class is instantiated by the FSNamesystem.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
public final class ErasureCodingPolicyManager {
public static Logger LOG = LoggerFactory.getLogger(
ErasureCodingPolicyManager.class);
private int maxCellSize =
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT;
private boolean userDefinedAllowed =
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_USERPOLICIES_ALLOWED_KEY_DEFAULT;
// Supported storage policies for striped EC files
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE =
new byte[]{
HdfsConstants.HOT_STORAGE_POLICY_ID,
HdfsConstants.COLD_STORAGE_POLICY_ID,
HdfsConstants.ALLSSD_STORAGE_POLICY_ID};
/**
* All policies sorted by name for fast querying, include built-in policy,
* user defined policy, removed policy.
*/
private Map<String, ErasureCodingPolicyInfo> policiesByName;
/**
* All policies sorted by ID for fast querying, including built-in policy,
* user defined policy, removed policy.
*/
private Map<Byte, ErasureCodingPolicyInfo> policiesByID;
/**
* For better performance when query all Policies.
*/
private ErasureCodingPolicyInfo[] allPolicies;
/**
* All policies in the state as it will be persisted in the fsimage.
*
* The difference between persisted policies and all policies is that
* if a default policy is only enabled at startup,
* it will appear as disabled in the persisted policy list and in the fsimage.
*/
private Map<Byte, ErasureCodingPolicyInfo> allPersistedPolicies;
/**
* All enabled policies sorted by name for fast querying, including built-in
* policy, user defined policy.
*/
private Map<String, ErasureCodingPolicy> enabledPoliciesByName;
/**
* For better performance when query all enabled Policies.
*/
private ErasureCodingPolicy[] enabledPolicies;
private String defaultPolicyName;
private volatile static ErasureCodingPolicyManager instance = null;
public static ErasureCodingPolicyManager getInstance() {
if (instance == null) {
instance = new ErasureCodingPolicyManager();
}
return instance;
}
private ErasureCodingPolicyManager() {}
public void init(Configuration conf) throws IOException {
this.policiesByName = new TreeMap<>();
this.policiesByID = new TreeMap<>();
this.enabledPoliciesByName = new TreeMap<>();
this.allPersistedPolicies = new TreeMap<>();
/**
* TODO: load user defined EC policy from fsImage HDFS-7859
* load persistent policies from image and editlog, which is done only once
* during NameNode startup. This can be done here or in a separate method.
*/
/*
* Add all System built-in policies into policy map
*/
for (ErasureCodingPolicy policy :
SystemErasureCodingPolicies.getPolicies()) {
final ErasureCodingPolicyInfo info = new ErasureCodingPolicyInfo(policy);
policiesByName.put(policy.getName(), info);
policiesByID.put(policy.getId(), info);
allPersistedPolicies.put(policy.getId(),
new ErasureCodingPolicyInfo(policy));
}
enableDefaultPolicy(conf);
updatePolicies();
maxCellSize = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_MAX_CELLSIZE_DEFAULT);
userDefinedAllowed = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_USERPOLICIES_ALLOWED_KEY,
DFSConfigKeys.
DFS_NAMENODE_EC_POLICIES_USERPOLICIES_ALLOWED_KEY_DEFAULT);
}
/**
* Get the set of enabled policies.
* @return all policies
*/
public ErasureCodingPolicy[] getEnabledPolicies() {
return enabledPolicies;
}
/**
* Get enabled policy by policy name.
*/
public ErasureCodingPolicy getEnabledPolicyByName(String name) {
ErasureCodingPolicy ecPolicy = enabledPoliciesByName.get(name);
if (ecPolicy == null) {
if (name.equalsIgnoreCase(ErasureCodeConstants.REPLICATION_POLICY_NAME)) {
ecPolicy = SystemErasureCodingPolicies.getReplicationPolicy();
}
}
return ecPolicy;
}
/**
* @return if the specified storage policy ID is suitable for striped EC
* files.
*/
public static boolean checkStoragePolicySuitableForECStripedMode(
byte storagePolicyID) {
boolean isPolicySuitable = false;
for (byte suitablePolicy : SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE) {
if (storagePolicyID == suitablePolicy) {
isPolicySuitable = true;
break;
}
}
return isPolicySuitable;
}
/**
* Get all system defined policies and user defined policies.
* @return all policies
*/
public ErasureCodingPolicyInfo[] getPolicies() {
return allPolicies;
}
/**
* Get all system defined policies and user defined policies
* as it is written out in the fsimage.
*
* The difference between persisted policies and all policies is that
* if a default policy is only enabled at startup,
* it will appear as disabled in the persisted policy list and in the fsimage.
*
* @return persisted policies
*/
public ErasureCodingPolicyInfo[] getPersistedPolicies() {
return allPersistedPolicies.values()
.toArray(new ErasureCodingPolicyInfo[0]);
}
public ErasureCodingPolicy[] getCopyOfEnabledPolicies() {
ErasureCodingPolicy[] copy;
synchronized (this) {
copy = Arrays.copyOf(enabledPolicies, enabledPolicies.length);
}
return copy;
}
/**
* Get a {@link ErasureCodingPolicy} by policy ID, including system policy
* and user defined policy.
* @return ecPolicy, or null if not found
*/
public ErasureCodingPolicy getByID(byte id) {
final ErasureCodingPolicyInfo ecpi = getPolicyInfoByID(id);
if (ecpi == null) {
return null;
}
return ecpi.getPolicy();
}
/**
* Get a {@link ErasureCodingPolicyInfo} by policy ID, including system policy
* and user defined policy.
*/
private ErasureCodingPolicyInfo getPolicyInfoByID(final byte id) {
return this.policiesByID.get(id);
}
/**
* Get a {@link ErasureCodingPolicy} by policy name, including system
* policy and user defined policy.
* @return ecPolicy, or null if not found
*/
public ErasureCodingPolicy getByName(String name) {
final ErasureCodingPolicyInfo ecpi = getPolicyInfoByName(name);
if (ecpi == null) {
return null;
}
return ecpi.getPolicy();
}
/**
* Get a {@link ErasureCodingPolicy} by policy name, including system
* policy, user defined policy and Replication policy.
* @return ecPolicy, or null if not found
*/
public ErasureCodingPolicy getErasureCodingPolicyByName(String name) {
final ErasureCodingPolicyInfo ecpi = getPolicyInfoByName(name);
if (ecpi == null) {
if (name.equalsIgnoreCase(ErasureCodeConstants.REPLICATION_POLICY_NAME)) {
return SystemErasureCodingPolicies.getReplicationPolicy();
}
return null;
}
return ecpi.getPolicy();
}
/**
* Get a {@link ErasureCodingPolicyInfo} by policy name, including system
* policy and user defined policy.
* @return ecPolicy, or null if not found
*/
private ErasureCodingPolicyInfo getPolicyInfoByName(final String name) {
return this.policiesByName.get(name);
}
/**
* Clear and clean up.
*/
public void clear() {
// TODO: we should only clear policies loaded from NN metadata.
// This is a placeholder for HDFS-7337.
}
/**
* Add an erasure coding policy.
* @return the added policy
*/
public synchronized ErasureCodingPolicy addPolicy(
ErasureCodingPolicy policy) {
if (!userDefinedAllowed) {
throw new HadoopIllegalArgumentException(
"Addition of user defined erasure coding policy is disabled.");
}
if (!CodecUtil.hasCodec(policy.getCodecName())) {
throw new HadoopIllegalArgumentException("Codec name "
+ policy.getCodecName() + " is not supported");
}
if (policy.getCellSize() > maxCellSize) {
throw new HadoopIllegalArgumentException("Cell size " +
policy.getCellSize() + " should not exceed maximum " +
maxCellSize + " bytes");
}
String assignedNewName = ErasureCodingPolicy.composePolicyName(
policy.getSchema(), policy.getCellSize());
for (ErasureCodingPolicyInfo info : getPolicies()) {
final ErasureCodingPolicy p = info.getPolicy();
if (p.getName().equals(assignedNewName)) {
LOG.info("The policy name " + assignedNewName + " already exists");
return p;
}
if (p.getSchema().equals(policy.getSchema()) &&
p.getCellSize() == policy.getCellSize()) {
LOG.info("A policy with same schema "
+ policy.getSchema().toString() + " and cell size "
+ p.getCellSize() + " already exists");
return p;
}
}
if (getCurrentMaxPolicyID() == ErasureCodeConstants.MAX_POLICY_ID) {
throw new HadoopIllegalArgumentException("Adding erasure coding " +
"policy failed because the number of policies stored in the " +
"system already reached the threshold, which is " +
ErasureCodeConstants.MAX_POLICY_ID);
}
policy = new ErasureCodingPolicy(assignedNewName, policy.getSchema(),
policy.getCellSize(), getNextAvailablePolicyID());
final ErasureCodingPolicyInfo pi = new ErasureCodingPolicyInfo(policy);
this.policiesByName.put(policy.getName(), pi);
this.policiesByID.put(policy.getId(), pi);
allPolicies =
policiesByName.values().toArray(new ErasureCodingPolicyInfo[0]);
allPersistedPolicies.put(policy.getId(),
new ErasureCodingPolicyInfo(policy));
LOG.info("Added erasure coding policy " + policy);
return policy;
}
private byte getCurrentMaxPolicyID() {
return policiesByID.keySet().stream().max(Byte::compareTo).orElse((byte)0);
}
private byte getNextAvailablePolicyID() {
byte nextPolicyID = (byte)(getCurrentMaxPolicyID() + 1);
return nextPolicyID > ErasureCodeConstants.USER_DEFINED_POLICY_START_ID ?
nextPolicyID : ErasureCodeConstants.USER_DEFINED_POLICY_START_ID;
}
/**
* Remove an User erasure coding policy by policyName.
*/
public synchronized void removePolicy(String name) {
final ErasureCodingPolicyInfo info = policiesByName.get(name);
if (info == null) {
throw new HadoopIllegalArgumentException("The policy name " +
name + " does not exist");
}
final ErasureCodingPolicy ecPolicy = info.getPolicy();
if (ecPolicy.isSystemPolicy()) {
throw new HadoopIllegalArgumentException("System erasure coding policy " +
name + " cannot be removed");
}
if (enabledPoliciesByName.containsKey(name)) {
enabledPoliciesByName.remove(name);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
}
info.setState(ErasureCodingPolicyState.REMOVED);
LOG.info("Remove erasure coding policy " + name);
allPersistedPolicies.put(ecPolicy.getId(),
createPolicyInfo(ecPolicy, ErasureCodingPolicyState.REMOVED));
/*
* TODO HDFS-12405 postpone the delete removed policy to Namenode restart
* time.
* */
}
@VisibleForTesting
public List<ErasureCodingPolicy> getRemovedPolicies() {
ArrayList<ErasureCodingPolicy> removedPolicies = new ArrayList<>();
for (ErasureCodingPolicyInfo info : policiesByName.values()) {
final ErasureCodingPolicy ecPolicy = info.getPolicy();
if (info.isRemoved()) {
removedPolicies.add(ecPolicy);
}
}
return removedPolicies;
}
/**
* Disable an erasure coding policy by policyName.
*/
public synchronized boolean disablePolicy(String name) {
ErasureCodingPolicyInfo info = policiesByName.get(name);
if (info == null) {
throw new HadoopIllegalArgumentException("The policy name " +
name + " does not exist");
}
if (enabledPoliciesByName.containsKey(name)) {
enabledPoliciesByName.remove(name);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
info.setState(ErasureCodingPolicyState.DISABLED);
LOG.info("Disabled the erasure coding policy " + name);
allPersistedPolicies.put(info.getPolicy().getId(),
createPolicyInfo(info.getPolicy(),
ErasureCodingPolicyState.DISABLED));
return true;
}
return false;
}
/**
* Enable an erasure coding policy by policyName.
*/
public synchronized boolean enablePolicy(String name) {
final ErasureCodingPolicyInfo info = policiesByName.get(name);
if (info == null) {
throw new HadoopIllegalArgumentException("The policy name " +
name + " does not exist");
}
if (enabledPoliciesByName.containsKey(name)) {
if (defaultPolicyName.equals(name)) {
allPersistedPolicies.put(info.getPolicy().getId(),
createPolicyInfo(info.getPolicy(),
ErasureCodingPolicyState.ENABLED));
return true;
}
return false;
}
final ErasureCodingPolicy ecPolicy = info.getPolicy();
enabledPoliciesByName.put(name, ecPolicy);
info.setState(ErasureCodingPolicyState.ENABLED);
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
allPersistedPolicies.put(ecPolicy.getId(),
createPolicyInfo(info.getPolicy(), ErasureCodingPolicyState.ENABLED));
LOG.info("Enabled the erasure coding policy " + name);
return true;
}
/**
* Load an erasure coding policy into erasure coding manager.
*/
private void loadPolicy(ErasureCodingPolicyInfo info) {
Preconditions.checkNotNull(info);
final ErasureCodingPolicy policy = info.getPolicy();
if (!CodecUtil.hasCodec(policy.getCodecName()) ||
policy.getCellSize() > maxCellSize) {
// If policy is not supported in current system, set the policy state to
// DISABLED;
info.setState(ErasureCodingPolicyState.DISABLED);
}
this.policiesByName.put(policy.getName(), info);
this.policiesByID.put(policy.getId(), info);
if (info.isEnabled()) {
enablePolicy(policy.getName());
}
allPersistedPolicies.put(policy.getId(),
createPolicyInfo(policy, info.getState()));
}
/**
* Reload erasure coding policies from fsImage.
*
* @param ecPolicies contains ErasureCodingPolicy list
*
*/
public synchronized void loadPolicies(
List<ErasureCodingPolicyInfo> ecPolicies, Configuration conf)
throws IOException{
Preconditions.checkNotNull(ecPolicies);
for (ErasureCodingPolicyInfo p : ecPolicies) {
loadPolicy(p);
}
enableDefaultPolicy(conf);
updatePolicies();
}
private void enableDefaultPolicy(Configuration conf) throws IOException {
defaultPolicyName = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
if (!defaultPolicyName.isEmpty()) {
final ErasureCodingPolicyInfo info =
policiesByName.get(defaultPolicyName);
if (info == null) {
String names = policiesByName.values()
.stream().map((pi) -> pi.getPolicy().getName())
.collect(Collectors.joining(", "));
String msg = String.format("EC policy '%s' specified at %s is not a "
+ "valid policy. Please choose from list of available "
+ "policies: [%s]",
defaultPolicyName,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
names);
throw new IOException(msg);
}
info.setState(ErasureCodingPolicyState.ENABLED);
enabledPoliciesByName.put(info.getPolicy().getName(), info.getPolicy());
}
}
private void updatePolicies() {
enabledPolicies =
enabledPoliciesByName.values().toArray(new ErasureCodingPolicy[0]);
allPolicies =
policiesByName.values().toArray(new ErasureCodingPolicyInfo[0]);
}
public String getEnabledPoliciesMetric() {
return StringUtils.join(", ",
enabledPoliciesByName.keySet());
}
private ErasureCodingPolicyInfo createPolicyInfo(ErasureCodingPolicy p,
ErasureCodingPolicyState s) {
ErasureCodingPolicyInfo policyInfo = new ErasureCodingPolicyInfo(p);
policyInfo.setState(s);
return policyInfo;
}
}