blob: c95612e61b7e1e486372d71036f3ff49ec416ebb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding
* copyright ownership. The ASF licenses this file to you under the Apache
* License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService.newReflectiveBlockingService;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
/**
* The RPC server that listens to requests from clients.
*/
public class SCMClientProtocolServer implements
StorageContainerLocationProtocol, Auditor {
private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class);
private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.SCMLOGGER);
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
private final StorageContainerManager scm;
private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;
public SCMClientProtocolServer(OzoneConfiguration conf,
StorageContainerManager scm) throws IOException {
this.scm = scm;
final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
protocolMetrics = ProtocolMessageMetrics
.create("ScmContainerLocationProtocol",
"SCM ContainerLocation protocol metrics",
StorageContainerLocationProtocolProtos.Type.values());
// SCM Container Service RPC
BlockingService storageProtoPbService =
newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this,
scm,
protocolMetrics));
final InetSocketAddress scmAddress =
scm.getScmNodeDetails().getClientProtocolServerAddress();
clientRpcServer =
startRpcServer(
conf,
scmAddress,
StorageContainerLocationProtocolPB.class,
storageProtoPbService,
handlerCount);
clientRpcAddress =
updateRPCListenAddress(conf,
scm.getScmNodeDetails().getClientProtocolServerAddressKey(),
scmAddress, clientRpcServer);
if (conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false)) {
clientRpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
}
HddsServerUtil.addSuppressedLoggingExceptions(clientRpcServer);
}
public RPC.Server getClientRpcServer() {
return clientRpcServer;
}
public InetSocketAddress getClientRpcAddress() {
return clientRpcAddress;
}
public void start() {
protocolMetrics.register();
LOG.info(
StorageContainerManager.buildRpcServerStartMessage(
"RPC server for Client ", getClientRpcAddress()));
getClientRpcServer().start();
}
public void stop() {
protocolMetrics.unregister();
try {
LOG.info("Stopping the RPC server for Client Protocol");
getClientRpcServer().stop();
} catch (Exception ex) {
LOG.error("Client Protocol RPC stop failed.", ex);
}
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
public void join() throws InterruptedException {
LOG.trace("Join RPC server for Client Protocol");
getClientRpcServer().join();
}
public UserGroupInformation getRemoteUser() {
return Server.getRemoteUser();
}
@Override
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
if (scm.getScmContext().isInSafeMode()) {
throw new SCMException("SafeModePrecheck failed for allocateContainer",
ResultCodes.SAFE_MODE_EXCEPTION);
}
getScm().checkAdminAccess(getRemoteUser());
final ContainerInfo container = scm.getContainerManager()
.allocateContainer(
ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor),
owner);
final Pipeline pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
return new ContainerWithPipeline(container, pipeline);
}
@Override
public ContainerInfo getContainer(long containerID) throws IOException {
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("containerID", String.valueOf(containerID));
getScm().checkAdminAccess(getRemoteUser());
try {
return scm.getContainerManager()
.getContainer(ContainerID.valueOf(containerID));
} catch (IOException ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_CONTAINER, auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_CONTAINER, auditMap)
);
}
}
}
private ContainerWithPipeline getContainerWithPipelineCommon(
long containerID) throws IOException {
final ContainerID cid = ContainerID.valueOf(containerID);
final ContainerInfo container = scm.getContainerManager()
.getContainer(cid);
if (scm.getScmContext().isInSafeMode()) {
if (container.isOpen()) {
if (!hasRequiredReplicas(container)) {
throw new SCMException("Open container " + containerID + " doesn't"
+ " have enough replicas to service this operation in "
+ "Safe mode.", ResultCodes.SAFE_MODE_EXCEPTION);
}
}
}
Pipeline pipeline;
try {
pipeline = container.isOpen() ? scm.getPipelineManager()
.getPipeline(container.getPipelineID()) : null;
} catch (PipelineNotFoundException ex) {
// The pipeline is destroyed.
pipeline = null;
}
if (pipeline == null) {
pipeline = scm.getPipelineManager().createPipeline(
container.getReplicationConfig(),
scm.getContainerManager()
.getContainerReplicas(cid).stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList()));
}
return new ContainerWithPipeline(container, pipeline);
}
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException {
getScm().checkAdminAccess(null);
try {
ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.GET_CONTAINER_WITH_PIPELINE,
Collections.singletonMap("containerID",
ContainerID.valueOf(containerID).toString())));
return cp;
} catch (IOException ex) {
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.GET_CONTAINER_WITH_PIPELINE,
Collections.singletonMap("containerID",
ContainerID.valueOf(containerID).toString()), ex));
throw ex;
}
}
@Override
public List<HddsProtos.SCMContainerReplicaProto>
getContainerReplicas(long containerId) throws IOException {
List<HddsProtos.SCMContainerReplicaProto> results = new ArrayList<>();
Set<ContainerReplica> replicas = getScm().getContainerManager()
.getContainerReplicas(ContainerID.valueOf(containerId));
for (ContainerReplica r : replicas) {
results.add(
HddsProtos.SCMContainerReplicaProto.newBuilder()
.setContainerID(containerId)
.setState(r.getState().toString())
.setDatanodeDetails(r.getDatanodeDetails().getProtoBufMessage())
.setBytesUsed(r.getBytesUsed())
.setPlaceOfBirth(r.getOriginDatanodeId().toString())
.setKeyCount(r.getKeyCount())
.setSequenceID(r.getSequenceId())
.setReplicaIndex(r.getReplicaIndex()).build()
);
}
return results;
}
@Override
public List<ContainerWithPipeline> getContainerWithPipelineBatch(
List<Long> containerIDs) throws IOException {
getScm().checkAdminAccess(null);
List<ContainerWithPipeline> cpList = new ArrayList<>();
StringBuilder strContainerIDs = new StringBuilder();
for (Long containerID : containerIDs) {
try {
ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
cpList.add(cp);
strContainerIDs.append(ContainerID.valueOf(containerID).toString());
strContainerIDs.append(",");
} catch (IOException ex) {
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
Collections.singletonMap("containerID",
ContainerID.valueOf(containerID).toString()), ex));
throw ex;
}
}
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.GET_CONTAINER_WITH_PIPELINE_BATCH,
Collections.singletonMap("containerIDs", strContainerIDs.toString())));
return cpList;
}
@Override
public List<ContainerWithPipeline> getExistContainerWithPipelinesInBatch(
List<Long> containerIDs) {
List<ContainerWithPipeline> cpList = new ArrayList<>();
for (Long containerID : containerIDs) {
try {
ContainerWithPipeline cp = getContainerWithPipelineCommon(containerID);
cpList.add(cp);
} catch (IOException ex) {
//not found , just go ahead
}
}
return cpList;
}
/**
* Check if container reported replicas are equal or greater than required
* replication factor.
*/
private boolean hasRequiredReplicas(ContainerInfo contInfo) {
try{
return getScm().getContainerManager()
.getContainerReplicas(contInfo.containerID())
.size() >= contInfo.getReplicationConfig().getRequiredNodes();
} catch (ContainerNotFoundException ex) {
// getContainerReplicas throws exception if no replica's exist for given
// container.
return false;
}
}
/**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
* @param count count must be {@literal >} 0.
*
* @return a list of pipeline.
* @throws IOException
*/
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
return listContainer(startContainerID, count, null, null);
}
/**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
* @param count count must be {@literal >} 0.
* @param state Container with this state will be returned.
*
* @return a list of pipeline.
* @throws IOException
*/
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count, HddsProtos.LifeCycleState state) throws IOException {
return listContainer(startContainerID, count, state, null);
}
/**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
* @param count count must be {@literal >} 0.
* @param state Container with this state will be returned.
* @param factor Container factor.
* @return a list of pipeline.
* @throws IOException
*/
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count, HddsProtos.LifeCycleState state,
HddsProtos.ReplicationFactor factor) throws IOException {
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("startContainerID", String.valueOf(startContainerID));
auditMap.put("count", String.valueOf(count));
if (state != null) {
auditMap.put("state", state.name());
}
if (factor != null) {
auditMap.put("factor", factor.name());
}
try {
final ContainerID containerId = ContainerID.valueOf(startContainerID);
if (state != null) {
if (factor != null) {
return scm.getContainerManager().getContainers(state).stream()
.filter(info -> info.containerID().getId() >= startContainerID)
//Filtering EC replication type as EC will not have factor.
.filter(info -> info
.getReplicationType() != HddsProtos.ReplicationType.EC)
.filter(info -> (info.getReplicationFactor() == factor))
.sorted().limit(count).collect(Collectors.toList());
} else {
return scm.getContainerManager().getContainers(state).stream()
.filter(info -> info.containerID().getId() >= startContainerID)
.sorted().limit(count).collect(Collectors.toList());
}
} else {
if (factor != null) {
return scm.getContainerManager().getContainers().stream()
.filter(info -> info.containerID().getId() >= startContainerID)
//Filtering EC replication type as EC will not have factor.
.filter(info -> info
.getReplicationType() != HddsProtos.ReplicationType.EC)
.filter(info -> info.getReplicationFactor() == factor)
.sorted().limit(count).collect(Collectors.toList());
} else {
return scm.getContainerManager().getContainers(containerId, count);
}
}
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.LIST_CONTAINER, auditMap, ex));
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.LIST_CONTAINER, auditMap));
}
}
}
@Override
public void deleteContainer(long containerID) throws IOException {
boolean auditSuccess = true;
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("containerID", String.valueOf(containerID));
UserGroupInformation remoteUser = getRemoteUser();
auditMap.put("remoteUser", remoteUser.getUserName());
try {
getScm().checkAdminAccess(remoteUser);
scm.getContainerManager().deleteContainer(
ContainerID.valueOf(containerID));
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logWriteFailure(
buildAuditMessageForFailure(SCMAction.DELETE_CONTAINER, auditMap, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.DELETE_CONTAINER, auditMap)
);
}
}
}
@Override
public List<HddsProtos.Node> queryNode(
HddsProtos.NodeOperationalState opState, HddsProtos.NodeState state,
HddsProtos.QueryScope queryScope, String poolName, int clientVersion)
throws IOException {
if (queryScope == HddsProtos.QueryScope.POOL) {
throw new IllegalArgumentException("Not Supported yet");
}
List<HddsProtos.Node> result = new ArrayList<>();
for (DatanodeDetails node : queryNode(opState, state)) {
try {
NodeStatus ns = scm.getScmNodeManager().getNodeStatus(node);
result.add(HddsProtos.Node.newBuilder()
.setNodeID(node.toProto(clientVersion))
.addNodeStates(ns.getHealth())
.addNodeOperationalStates(ns.getOperationalState())
.build());
} catch (NodeNotFoundException e) {
throw new IOException(
"An unexpected error occurred querying the NodeStatus", e);
}
}
return result;
}
@Override
public List<DatanodeAdminError> decommissionNodes(List<String> nodes)
throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser());
return scm.getScmDecommissionManager().decommissionNodes(nodes);
} catch (Exception ex) {
LOG.error("Failed to decommission nodes", ex);
throw ex;
}
}
@Override
public List<DatanodeAdminError> recommissionNodes(List<String> nodes)
throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser());
return scm.getScmDecommissionManager().recommissionNodes(nodes);
} catch (Exception ex) {
LOG.error("Failed to recommission nodes", ex);
throw ex;
}
}
@Override
public List<DatanodeAdminError> startMaintenanceNodes(List<String> nodes,
int endInHours) throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser());
return scm.getScmDecommissionManager()
.startMaintenanceNodes(nodes, endInHours);
} catch (Exception ex) {
LOG.error("Failed to place nodes into maintenance mode", ex);
throw ex;
}
}
@Override
public void closeContainer(long containerID) throws IOException {
final UserGroupInformation remoteUser = getRemoteUser();
final Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("containerID", String.valueOf(containerID));
auditMap.put("remoteUser", remoteUser.getUserName());
try {
scm.checkAdminAccess(remoteUser);
final ContainerID cid = ContainerID.valueOf(containerID);
final HddsProtos.LifeCycleState state = scm.getContainerManager()
.getContainer(cid).getState();
if (!state.equals(HddsProtos.LifeCycleState.OPEN)) {
throw new SCMException("Cannot close a " + state + " container.",
ResultCodes.UNEXPECTED_CONTAINER_STATE);
}
scm.getEventQueue().fireEvent(SCMEvents.CLOSE_CONTAINER,
ContainerID.valueOf(containerID));
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.CLOSE_CONTAINER, auditMap));
} catch (Exception ex) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
SCMAction.CLOSE_CONTAINER, auditMap, ex));
throw ex;
}
}
@Override
public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException {
Pipeline result = scm.getPipelineManager()
.createPipeline(ReplicationConfig.fromProtoTypeAndFactor(type, factor));
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.CREATE_PIPELINE, null));
return result;
}
@Override
public List<Pipeline> listPipelines() {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.LIST_PIPELINE, null));
return scm.getPipelineManager().getPipelines();
}
@Override
public Pipeline getPipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
return scm.getPipelineManager().getPipeline(
PipelineID.getFromProtobuf(pipelineID));
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.ACTIVATE_PIPELINE, null));
scm.getPipelineManager().activatePipeline(
PipelineID.getFromProtobuf(pipelineID));
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
getScm().checkAdminAccess(getRemoteUser());
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.DEACTIVATE_PIPELINE, null));
scm.getPipelineManager().deactivatePipeline(
PipelineID.getFromProtobuf(pipelineID));
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
getScm().checkAdminAccess(getRemoteUser());
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("pipelineID", pipelineID.getId());
PipelineManager pipelineManager = scm.getPipelineManager();
Pipeline pipeline =
pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
pipelineManager.closePipeline(pipeline, true);
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
);
}
@Override
public ScmInfo getScmInfo() throws IOException {
boolean auditSuccess = true;
try{
ScmInfo.Builder builder =
new ScmInfo.Builder()
.setClusterId(scm.getScmStorageConfig().getClusterID())
.setScmId(scm.getScmStorageConfig().getScmId());
if (scm.getScmHAManager().getRatisServer() != null) {
builder.setRatisPeerRoles(
scm.getScmHAManager().getRatisServer().getRatisRoles());
} else {
// In case, there is no ratis, there is no ratis role.
// This will just print the hostname with ratis port as the default
// behaviour.
String adddress = scm.getSCMHANodeDetails().getLocalNodeDetails()
.getRatisHostPortStr();
builder.setRatisPeerRoles(Arrays.asList(adddress));
}
return builder.build();
} catch (Exception ex) {
auditSuccess = false;
AUDIT.logReadFailure(
buildAuditMessageForFailure(SCMAction.GET_SCM_INFO, null, ex)
);
throw ex;
} finally {
if(auditSuccess) {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.GET_SCM_INFO, null)
);
}
}
}
/**
* Check if SCM is in safe mode.
*
* @return Returns true if SCM is in safe mode else returns false.
* @throws IOException
*/
@Override
public boolean inSafeMode() throws IOException {
AUDIT.logReadSuccess(
buildAuditMessageForSuccess(SCMAction.IN_SAFE_MODE, null)
);
return scm.isInSafeMode();
}
@Override
public Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
throws IOException {
return scm.getRuleStatus();
}
/**
* Force SCM out of Safe mode.
*
* @return returns true if operation is successful.
* @throws IOException
*/
@Override
public boolean forceExitSafeMode() throws IOException {
getScm().checkAdminAccess(getRemoteUser());
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.FORCE_EXIT_SAFE_MODE, null)
);
return scm.exitSafeMode();
}
@Override
public void startReplicationManager() throws IOException {
getScm().checkAdminAccess(getRemoteUser());
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.START_REPLICATION_MANAGER, null));
scm.getReplicationManager().start();
}
@Override
public void stopReplicationManager() throws IOException {
getScm().checkAdminAccess(getRemoteUser());
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.STOP_REPLICATION_MANAGER, null));
scm.getReplicationManager().stop();
}
@Override
public boolean getReplicationManagerStatus() {
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.GET_REPLICATION_MANAGER_STATUS, null));
return scm.getReplicationManager().isRunning();
}
@Override
public ReplicationManagerReport getReplicationManagerReport()
throws IOException {
getScm().checkAdminAccess(getRemoteUser());
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.GET_REPLICATION_MANAGER_REPORT, null));
return scm.getReplicationManager().getContainerReport();
}
@Override
public StatusAndMessages finalizeScmUpgrade(String upgradeClientID) throws
IOException {
// check admin authorization
try {
getScm().checkAdminAccess(getRemoteUser());
} catch (IOException e) {
LOG.error("Authorization failed for finalize scm upgrade", e);
throw e;
}
return scm.finalizeUpgrade(upgradeClientID);
}
@Override
public StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException {
if (!readonly) {
// check admin authorization
try {
getScm().checkAdminAccess(getRemoteUser());
} catch (IOException e) {
LOG.error("Authorization failed for query scm upgrade finalization " +
"progress", e);
throw e;
}
}
return scm.queryUpgradeFinalizationProgress(upgradeClientID, force,
readonly);
}
@Override
public boolean startContainerBalancer(
Optional<Double> threshold, Optional<Integer> iterations,
Optional<Integer> maxDatanodesPercentageToInvolvePerIteration,
Optional<Long> maxSizeToMovePerIterationInGB,
Optional<Long> maxSizeEnteringTarget,
Optional<Long> maxSizeLeavingSource) throws IOException {
getScm().checkAdminAccess(getRemoteUser());
ContainerBalancerConfiguration cbc =
scm.getConfiguration().getObject(ContainerBalancerConfiguration.class);
if (threshold.isPresent()) {
double tsd = threshold.get();
Preconditions.checkState(tsd >= 0.0D && tsd < 100.0D,
"threshold should be specified in range [0.0, 100.0).");
cbc.setThreshold(tsd);
}
if (maxSizeToMovePerIterationInGB.isPresent()) {
long mstm = maxSizeToMovePerIterationInGB.get();
Preconditions.checkState(mstm > 0,
"maxSizeToMovePerIterationInGB must be positive.");
cbc.setMaxSizeToMovePerIteration(mstm * OzoneConsts.GB);
}
if (maxDatanodesPercentageToInvolvePerIteration.isPresent()) {
int mdti = maxDatanodesPercentageToInvolvePerIteration.get();
Preconditions.checkState(mdti >= 0,
"maxDatanodesPercentageToInvolvePerIteration must be " +
"greater than equal to zero.");
Preconditions.checkState(mdti <= 100,
"maxDatanodesPercentageToInvolvePerIteration must be " +
"lesser than or equal to 100.");
cbc.setMaxDatanodesPercentageToInvolvePerIteration(mdti);
}
if (iterations.isPresent()) {
int i = iterations.get();
Preconditions.checkState(i > 0 || i == -1,
"number of iterations must be positive or" +
" -1 (for running container balancer infinitely).");
cbc.setIterations(i);
}
if (maxSizeEnteringTarget.isPresent()) {
long mset = maxSizeEnteringTarget.get();
Preconditions.checkState(mset > 0,
"maxSizeEnteringTarget must be " +
"greater than zero.");
cbc.setMaxSizeEnteringTarget(mset * OzoneConsts.GB);
}
if (maxSizeLeavingSource.isPresent()) {
long msls = maxSizeLeavingSource.get();
Preconditions.checkState(msls > 0,
"maxSizeLeavingSource must be " +
"greater than zero.");
cbc.setMaxSizeLeavingSource(msls * OzoneConsts.GB);
}
boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc);
if (isStartedSuccessfully) {
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.START_CONTAINER_BALANCER, null));
} else {
AUDIT.logWriteFailure(buildAuditMessageForSuccess(
SCMAction.START_CONTAINER_BALANCER, null));
}
return isStartedSuccessfully;
}
@Override
public void stopContainerBalancer() throws IOException {
getScm().checkAdminAccess(getRemoteUser());
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.STOP_CONTAINER_BALANCER, null));
scm.getContainerBalancer().stop();
}
@Override
public boolean getContainerBalancerStatus() {
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.GET_CONTAINER_BALANCER_STATUS, null));
return scm.getContainerBalancer().isBalancerRunning();
}
/**
* Get Datanode usage info such as capacity, SCMUsed, and remaining by ip
* or uuid.
*
* @param ipaddress Datanode Address String
* @param uuid Datanode UUID String
* @return List of DatanodeUsageInfoProto. Each element contains usage info
* such as capacity, SCMUsed, and remaining space.
* @throws IOException if admin authentication fails
*/
@Override
public List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
String ipaddress, String uuid) throws IOException {
// check admin authorisation
try {
getScm().checkAdminAccess(getRemoteUser());
} catch (IOException e) {
LOG.error("Authorization failed", e);
throw e;
}
// get datanodes by ip or uuid
List<DatanodeDetails> nodes = new ArrayList<>();
if (!Strings.isNullOrEmpty(uuid)) {
nodes.add(scm.getScmNodeManager().getNodeByUuid(uuid));
} else if (!Strings.isNullOrEmpty(ipaddress)) {
nodes = scm.getScmNodeManager().getNodesByAddress(ipaddress);
} else {
throw new IOException(
"Could not get datanode with the specified parameters."
);
}
// get datanode usage info
List<HddsProtos.DatanodeUsageInfoProto> infoList = new ArrayList<>();
for (DatanodeDetails node : nodes) {
infoList.add(getUsageInfoFromDatanodeDetails(node));
}
return infoList;
}
/**
* Get usage details for a specific DatanodeDetails node.
*
* @param node DatanodeDetails
* @return Usage info such as capacity, SCMUsed, and remaining space.
*/
private HddsProtos.DatanodeUsageInfoProto getUsageInfoFromDatanodeDetails(
DatanodeDetails node) {
SCMNodeStat stat = scm.getScmNodeManager().getNodeStat(node).get();
long capacity = stat.getCapacity().get();
long used = stat.getScmUsed().get();
long remaining = stat.getRemaining().get();
return HddsProtos.DatanodeUsageInfoProto.newBuilder()
.setCapacity(capacity)
.setUsed(used)
.setRemaining(remaining)
.setNode(node.toProto(node.getCurrentVersion()))
.build();
}
/**
* Get a sorted list of most or least used DatanodeUsageInfo containing
* healthy, in-service nodes.
*
* @param mostUsed true if most used, false if least used
* @param count number of nodes to get; must be an integer greater than zero
* @return List of DatanodeUsageInfoProto. Each element contains usage info
* such as capacity, SCMUsed, and remaining space.
* @throws IOException if admin authentication fails
* @throws IllegalArgumentException if count is not an integer greater than
* zero
*/
@Override
public List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
boolean mostUsed, int count) throws IOException, IllegalArgumentException{
// check admin authorisation
try {
getScm().checkAdminAccess(getRemoteUser());
} catch (IOException e) {
LOG.error("Authorization failed", e);
throw e;
}
if (count < 1) {
throw new IllegalArgumentException("The specified parameter count must " +
"be an integer greater than zero.");
}
List<DatanodeUsageInfo> datanodeUsageInfoList =
scm.getScmNodeManager().getMostOrLeastUsedDatanodes(mostUsed);
// if count is greater than the size of list containing healthy,
// in-service nodes, just set count to that size
if (count > datanodeUsageInfoList.size()) {
count = datanodeUsageInfoList.size();
}
// return count number of DatanodeUsageInfoProto
return datanodeUsageInfoList.stream()
.map(DatanodeUsageInfo::toProto)
.limit(count)
.collect(Collectors.toList());
}
@Override
public Token<?> getContainerToken(ContainerID containerID)
throws IOException {
UserGroupInformation remoteUser = getRemoteUser();
getScm().checkAdminAccess(remoteUser);
return scm.getContainerTokenGenerator()
.generateToken(remoteUser.getUserName(), containerID);
}
@Override
public long getContainerCount() throws IOException {
return scm.getContainerManager().getContainers().size();
}
/**
* Queries a list of Node that match a set of statuses.
*
* <p>For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, then
* this call will return all
* healthy nodes which members in Raft pipeline.
*
* <p>Right now we don't support operations, so we assume it is an AND
* operation between the
* operators.
*
* @param opState - NodeOperational State
* @param state - NodeState.
* @return List of Datanodes.
*/
public List<DatanodeDetails> queryNode(
HddsProtos.NodeOperationalState opState, HddsProtos.NodeState state) {
return new ArrayList<>(queryNodeState(opState, state));
}
@VisibleForTesting
public StorageContainerManager getScm() {
return scm;
}
/**
* Set safe mode status based on .
*/
public boolean getSafeModeStatus() {
return scm.getScmContext().isInSafeMode();
}
/**
* Query the System for Nodes.
*
* @params opState - The node operational state
* @param nodeState - NodeState that we are interested in matching.
* @return Set of Datanodes that match the NodeState.
*/
private Set<DatanodeDetails> queryNodeState(
HddsProtos.NodeOperationalState opState, HddsProtos.NodeState nodeState) {
Set<DatanodeDetails> returnSet = new TreeSet<>();
List<DatanodeDetails> tmp = scm.getScmNodeManager()
.getNodes(opState, nodeState);
if ((tmp != null) && (tmp.size() > 0)) {
returnSet.addAll(tmp);
}
return returnSet;
}
@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
return new AuditMessage.Builder()
.setUser(getRemoteUserName())
.atIp(Server.getRemoteAddress())
.forOperation(op)
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS)
.build();
}
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.setUser(getRemoteUserName())
.atIp(Server.getRemoteAddress())
.forOperation(op)
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE)
.withException(throwable)
.build();
}
@Override
public void close() throws IOException {
stop();
}
}