blob: 3f4c55cfd11b709476fcb4f0e05bdc74a7e79908 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides the client-facing APIs of container operations.
*/
public class ContainerOperationClient implements ScmClient {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerOperationClient.class);
private final long containerSizeB;
private final HddsProtos.ReplicationFactor replicationFactor;
private final HddsProtos.ReplicationType replicationType;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}
private final XceiverClientManager xceiverClientManager;
public ContainerOperationClient(OzoneConfiguration conf) throws IOException {
storageContainerLocationClient = newContainerRpcClient(conf);
this.xceiverClientManager = newXCeiverClientManager(conf);
containerSizeB = (int) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
boolean useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
if (useRatis) {
replicationFactor = HddsProtos.ReplicationFactor.THREE;
replicationType = HddsProtos.ReplicationType.RATIS;
} else {
replicationFactor = HddsProtos.ReplicationFactor.ONE;
replicationType = HddsProtos.ReplicationType.STAND_ALONE;
}
}
@VisibleForTesting
public StorageContainerLocationProtocol
getStorageContainerLocationProtocol() {
return storageContainerLocationClient;
}
private XceiverClientManager newXCeiverClientManager(ConfigurationSource conf)
throws IOException {
XceiverClientManager manager;
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
SecurityConfig securityConfig = new SecurityConfig(conf);
SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
(OzoneConfiguration) securityConfig.getConfiguration());
String caCertificate =
scmSecurityProtocolClient.getCACertificate();
manager = new XceiverClientManager(conf,
conf.getObject(XceiverClientManager.ScmClientConfig.class),
caCertificate);
} else {
manager = new XceiverClientManager(conf);
}
return manager;
}
public static StorageContainerLocationProtocol newContainerRpcClient(
ConfigurationSource configSource) throws IOException {
Class<StorageContainerLocationProtocolPB> protocol =
StorageContainerLocationProtocolPB.class;
Configuration conf =
LegacyHadoopConfigurationSource.asHadoopConfiguration(configSource);
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
long version = RPC.getProtocolVersion(protocol);
InetSocketAddress scmAddress = getScmAddressForClients(configSource);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf);
int rpcTimeOut = Client.getRpcTimeout(conf);
StorageContainerLocationProtocolPB rpcProxy =
RPC.getProxy(protocol, version, scmAddress, user, conf,
socketFactory, rpcTimeOut);
StorageContainerLocationProtocolClientSideTranslatorPB client =
new StorageContainerLocationProtocolClientSideTranslatorPB(rpcProxy);
return TracingUtil.createProxy(
client, StorageContainerLocationProtocol.class, configSource);
}
@Override
public ContainerWithPipeline createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.
allocateContainer(replicationType, replicationFactor, owner);
Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline);
Preconditions.checkState(
pipeline.isOpen(),
"Unexpected state=%s for pipeline=%s, expected state=%s",
pipeline.getPipelineState(), pipeline.getId(),
Pipeline.PipelineState.OPEN);
createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Create a container over pipeline specified by the SCM.
*
* @param client - Client to communicate with Datanodes.
* @param containerId - Container ID.
* @throws IOException
*/
public void createContainer(XceiverClientSpi client,
long containerId) throws IOException {
ContainerProtocolCalls.createContainer(client, containerId, null);
// Let us log this info after we let SCM know that we have completed the
// creation state.
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
+ " machines:" + client.getPipeline().getNodes());
}
}
/**
* Creates a pipeline over the machines chosen by the SCM.
*
* @param client - Client
* @param pipeline - pipeline to be createdon Datanodes.
* @throws IOException
*/
private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
throws IOException {
Preconditions.checkNotNull(pipeline.getId(), "Pipeline " +
"name cannot be null when client create flag is set.");
// Pipeline creation is a three step process.
//
// 1. Notify SCM that this client is doing a create pipeline on
// datanodes.
//
// 2. Talk to Datanodes to create the pipeline.
//
// 3. update SCM that pipeline creation was successful.
// TODO: this has not been fully implemented on server side
// SCMClientProtocolServer#notifyObjectStageChange
// TODO: when implement the pipeline state machine, change
// the pipeline name (string) to pipeline id (long)
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
// pipeline.getPipelineName(),
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin);
// client.createPipeline();
// TODO: Use PipelineManager to createPipeline
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
// pipeline.getPipelineName(),
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.complete);
// TODO : Should we change the state on the client side ??
// That makes sense, but it is not needed for the client to work.
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline creation successful. Pipeline: {}",
pipeline);
}
}
@Override
public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(type, factor,
owner);
Pipeline pipeline = containerWithPipeline.getPipeline();
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Returns a set of Nodes that meet a query criteria.
*
* @param opState - The operational state we want the node to have
* eg IN_SERVICE, DECOMMISSIONED, etc
* @param nodeState - The health we want the node to have, eg HEALTHY, STALE,
* etc
* @param queryScope - Query scope - Cluster or pool.
* @param poolName - if it is pool, a pool name is required.
* @return A set of nodes that meet the requested criteria.
* @throws IOException
*/
@Override
public List<HddsProtos.Node> queryNode(
HddsProtos.NodeOperationalState opState,
HddsProtos.NodeState nodeState,
HddsProtos.QueryScope queryScope, String poolName)
throws IOException {
return storageContainerLocationClient.queryNode(opState, nodeState,
queryScope, poolName, CURRENT_VERSION);
}
@Override
public void decommissionNodes(List<String> hosts) throws IOException {
storageContainerLocationClient.decommissionNodes(hosts);
}
@Override
public void recommissionNodes(List<String> hosts) throws IOException {
storageContainerLocationClient.recommissionNodes(hosts);
}
@Override
public void startMaintenanceNodes(List<String> hosts, int endHours)
throws IOException {
storageContainerLocationClient.startMaintenanceNodes(hosts, endHours);
}
/**
* Creates a specified replication pipeline.
*/
@Override
public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException {
return storageContainerLocationClient.createReplicationPipeline(type,
factor, nodePool);
}
@Override
public List<Pipeline> listPipelines() throws IOException {
return storageContainerLocationClient.listPipelines();
}
@Override
public Pipeline getPipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
return storageContainerLocationClient.getPipeline(pipelineID);
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.activatePipeline(pipelineID);
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.deactivatePipeline(pipelineID);
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.closePipeline(pipelineID);
}
@Override
public void close() {
try {
xceiverClientManager.close();
} catch (Exception ex) {
LOG.error("Can't close " + this.getClass().getSimpleName(), ex);
}
}
/**
* Deletes an existing container.
*
* @param containerId - ID of the container.
* @param pipeline - Pipeline that represents the container.
* @param force - true to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(long containerId, Pipeline pipeline,
boolean force) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls
.deleteContainer(client, containerId, force, null);
storageContainerLocationClient
.deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, machines: {} ", containerId,
pipeline.getNodes());
}
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Delete the container, this will release any resource it uses.
*
* @param containerID - containerID.
* @param force - True to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(long containerID, boolean force)
throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerID);
deleteContainer(containerID, info.getPipeline(), force);
}
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
return storageContainerLocationClient.listContainer(
startContainerID, count);
}
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count, HddsProtos.LifeCycleState state) throws IOException {
return storageContainerLocationClient.listContainer(
startContainerID, count, state);
}
/**
* Get meta data from an existing container.
*
* @param containerID - ID of the container.
* @param pipeline - Pipeline where the container is located.
* @return ContainerInfo
* @throws IOException
*/
@Override
public ContainerDataProto readContainer(long containerID,
Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClientForReadData(pipeline);
ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client, containerID, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, machines: {} ", containerID,
pipeline.getNodes());
}
return response.getContainerData();
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Get meta data from an existing container.
*
* @param containerID - ID of the container.
* @return ContainerInfo - a message of protobuf which has basic info
* of a container.
* @throws IOException
*/
@Override
public ContainerDataProto readContainer(long containerID) throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerID);
return readContainer(containerID, info.getPipeline());
}
/**
* Given an id, return the pipeline associated with the container.
*
* @param containerId - String Container ID
* @return Pipeline of the existing container, corresponding to the given id.
* @throws IOException
*/
@Override
public ContainerInfo getContainer(long containerId) throws
IOException {
return storageContainerLocationClient.getContainer(containerId);
}
/**
* Gets a container by Name -- Throws if the container does not exist.
*
* @param containerId - Container ID
* @return ContainerWithPipeline
* @throws IOException
*/
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerId)
throws IOException {
return storageContainerLocationClient.getContainerWithPipeline(containerId);
}
/**
* Close a container.
*
* @throws IOException
*/
@Override
public void closeContainer(long containerId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Close container {}", containerId);
}
storageContainerLocationClient.closeContainer(containerId);
}
/**
* Get the the current usage information.
*
* @param containerID - ID of the container.
* @return the size of the given container.
* @throws IOException
*/
@Override
public long getContainerSize(long containerID) throws IOException {
// TODO : Fix this, it currently returns the capacity
// but not the current usage.
return containerSizeB;
}
/**
* Check if SCM is in safe mode.
*
* @return Returns true if SCM is in safe mode else returns false.
* @throws IOException
*/
@Override
public boolean inSafeMode() throws IOException {
return storageContainerLocationClient.inSafeMode();
}
@Override
public Map<String, Pair<Boolean, String>> getSafeModeRuleStatuses()
throws IOException {
return storageContainerLocationClient.getSafeModeRuleStatuses();
}
/**
* Force SCM out of safe mode.
*
* @return returns true if operation is successful.
* @throws IOException
*/
public boolean forceExitSafeMode() throws IOException {
return storageContainerLocationClient.forceExitSafeMode();
}
@Override
public void startReplicationManager() throws IOException {
storageContainerLocationClient.startReplicationManager();
}
@Override
public void stopReplicationManager() throws IOException {
storageContainerLocationClient.stopReplicationManager();
}
@Override
public boolean getReplicationManagerStatus() throws IOException {
return storageContainerLocationClient.getReplicationManagerStatus();
}
/**
* Get Datanode Usage information by ipaddress or uuid.
*
* @param ipaddress datanode ipaddress String
* @param uuid datanode uuid String
* @return List of DatanodeUsageInfoProto. Each element contains info such as
* capacity, SCMused, and remaining space.
* @throws IOException
*/
@Override
public List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
String ipaddress, String uuid) throws IOException {
return storageContainerLocationClient.getDatanodeUsageInfo(ipaddress,
uuid);
}
/**
* Get usage information of most or least used datanodes.
*
* @param mostUsed true if most used, false if least used
* @param count Integer number of nodes to get info for
* @return List of DatanodeUsageInfoProto. Each element contains info such as
* capacity, SCMUsed, and remaining space.
* @throws IOException
*/
@Override
public List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
boolean mostUsed, int count) throws IOException {
return storageContainerLocationClient.getDatanodeUsageInfo(mostUsed, count);
}
}