blob: 982fb8ea1eec615f3b1aeedfcdabc2c7a4d39a37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.client;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* This class provides the client-facing APIs of container operations.
*/
public class ContainerOperationClient implements ScmClient {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerOperationClient.class);
private static long containerSizeB = -1;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final XceiverClientManager xceiverClientManager;
public ContainerOperationClient(
StorageContainerLocationProtocol
storageContainerLocationClient,
XceiverClientManager xceiverClientManager) {
this.storageContainerLocationClient = storageContainerLocationClient;
this.xceiverClientManager = xceiverClientManager;
}
/**
* Return the capacity of containers. The current assumption is that all
* containers have the same capacity. Therefore one static is sufficient for
* any container.
* @return The capacity of one container in number of bytes.
*/
public static long getContainerSizeB() {
return containerSizeB;
}
/**
* Set the capacity of container. Should be exactly once on system start.
* @param size Capacity of one container in number of bytes.
*/
public static void setContainerSizeB(long size) {
containerSizeB = size;
}
@Override
public ContainerWithPipeline createContainer(String owner)
throws IOException {
XceiverClientSpi client = null;
try {
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), owner);
Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline);
Preconditions.checkState(pipeline.isOpen(), String
.format("Unexpected state=%s for pipeline=%s, expected state=%s",
pipeline.getPipelineState(), pipeline.getId(),
Pipeline.PipelineState.OPEN));
createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Create a container over pipeline specified by the SCM.
*
* @param client - Client to communicate with Datanodes.
* @param containerId - Container ID.
* @throws IOException
*/
public void createContainer(XceiverClientSpi client,
long containerId) throws IOException {
ContainerProtocolCalls.createContainer(client, containerId, null);
// Let us log this info after we let SCM know that we have completed the
// creation state.
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
+ " machines:" + client.getPipeline().getNodes());
}
}
/**
* Creates a pipeline over the machines choosen by the SCM.
*
* @param client - Client
* @param pipeline - pipeline to be createdon Datanodes.
* @throws IOException
*/
private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
throws IOException {
Preconditions.checkNotNull(pipeline.getId(), "Pipeline " +
"name cannot be null when client create flag is set.");
// Pipeline creation is a three step process.
//
// 1. Notify SCM that this client is doing a create pipeline on
// datanodes.
//
// 2. Talk to Datanodes to create the pipeline.
//
// 3. update SCM that pipeline creation was successful.
// TODO: this has not been fully implemented on server side
// SCMClientProtocolServer#notifyObjectStageChange
// TODO: when implement the pipeline state machine, change
// the pipeline name (string) to pipeline id (long)
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
// pipeline.getPipelineName(),
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin);
// client.createPipeline();
// TODO: Use PipelineManager to createPipeline
//storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline,
// pipeline.getPipelineName(),
// ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.complete);
// TODO : Should we change the state on the client side ??
// That makes sense, but it is not needed for the client to work.
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline creation successful. Pipeline: {}",
pipeline.toString());
}
}
@Override
public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(type, factor,
owner);
Pipeline pipeline = containerWithPipeline.getPipeline();
client = xceiverClientManager.acquireClient(pipeline);
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
createContainer(client,
containerWithPipeline.getContainerInfo().getContainerID());
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Returns a set of Nodes that meet a query criteria.
*
* @param nodeStatuses - Criteria that we want the node to have.
* @param queryScope - Query scope - Cluster or pool.
* @param poolName - if it is pool, a pool name is required.
* @return A set of nodes that meet the requested criteria.
* @throws IOException
*/
@Override
public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
throws IOException {
return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
poolName);
}
/**
* Creates a specified replication pipeline.
*/
@Override
public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException {
return storageContainerLocationClient.createReplicationPipeline(type,
factor, nodePool);
}
@Override
public List<Pipeline> listPipelines() throws IOException {
return storageContainerLocationClient.listPipelines();
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.activatePipeline(pipelineID);
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.deactivatePipeline(pipelineID);
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.closePipeline(pipelineID);
}
@Override
public void close() {
try {
xceiverClientManager.close();
} catch (Exception ex) {
LOG.error("Can't close " + this.getClass().getSimpleName(), ex);
}
}
/**
* Deletes an existing container.
*
* @param containerId - ID of the container.
* @param pipeline - Pipeline that represents the container.
* @param force - true to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(long containerId, Pipeline pipeline,
boolean force) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls
.deleteContainer(client, containerId, force, null);
storageContainerLocationClient
.deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, machines: {} ", containerId,
pipeline.getNodes());
}
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Delete the container, this will release any resource it uses.
* @param containerID - containerID.
* @param force - True to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(long containerID, boolean force)
throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerID);
deleteContainer(containerID, info.getPipeline(), force);
}
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
return storageContainerLocationClient.listContainer(
startContainerID, count);
}
/**
* Get meta data from an existing container.
*
* @param containerID - ID of the container.
* @param pipeline - Pipeline where the container is located.
* @return ContainerInfo
* @throws IOException
*/
@Override
public ContainerDataProto readContainer(long containerID,
Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client, containerID, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, machines: {} ", containerID,
pipeline.getNodes());
}
return response.getContainerData();
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Get meta data from an existing container.
* @param containerID - ID of the container.
* @return ContainerInfo - a message of protobuf which has basic info
* of a container.
* @throws IOException
*/
@Override
public ContainerDataProto readContainer(long containerID) throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerID);
return readContainer(containerID, info.getPipeline());
}
/**
* Given an id, return the pipeline associated with the container.
* @param containerId - String Container ID
* @return Pipeline of the existing container, corresponding to the given id.
* @throws IOException
*/
@Override
public ContainerInfo getContainer(long containerId) throws
IOException {
return storageContainerLocationClient.getContainer(containerId);
}
/**
* Gets a container by Name -- Throws if the container does not exist.
*
* @param containerId - Container ID
* @return ContainerWithPipeline
* @throws IOException
*/
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerId)
throws IOException {
return storageContainerLocationClient.getContainerWithPipeline(containerId);
}
/**
* Close a container.
*
* @param pipeline the container to be closed.
* @throws IOException
*/
@Override
public void closeContainer(long containerId, Pipeline pipeline)
throws IOException {
XceiverClientSpi client = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Close container {}", pipeline);
}
/*
TODO: two orders here, revisit this later:
1. close on SCM first, then on data node
2. close on data node first, then on SCM
with 1: if client failed after closing on SCM, then there is a
container SCM thinks as closed, but is actually open. Then SCM will no
longer allocate block to it, which is fine. But SCM may later try to
replicate this "closed" container, which I'm not sure is safe.
with 2: if client failed after close on datanode, then there is a
container SCM thinks as open, but is actually closed. Then SCM will still
try to allocate block to it. Which will fail when actually doing the
write. No more data can be written, but at least the correctness and
consistency of existing data will maintain.
For now, take the #2 way.
*/
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.closeContainer(client, containerId,
null);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.complete);
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client, false);
}
}
}
/**
* Close a container.
*
* @throws IOException
*/
@Override
public void closeContainer(long containerId)
throws IOException {
ContainerWithPipeline info = getContainerWithPipeline(containerId);
Pipeline pipeline = info.getPipeline();
closeContainer(containerId, pipeline);
}
/**
* Get the the current usage information.
* @param containerID - ID of the container.
* @return the size of the given container.
* @throws IOException
*/
@Override
public long getContainerSize(long containerID) throws IOException {
// TODO : Fix this, it currently returns the capacity
// but not the current usage.
long size = getContainerSizeB();
if (size == -1) {
throw new IOException("Container size unknown!");
}
return size;
}
/**
* Check if SCM is in safe mode.
*
* @return Returns true if SCM is in safe mode else returns false.
* @throws IOException
*/
public boolean inSafeMode() throws IOException {
return storageContainerLocationClient.inSafeMode();
}
/**
* Force SCM out of safe mode.
*
* @return returns true if operation is successful.
* @throws IOException
*/
public boolean forceExitSafeMode() throws IOException {
return storageContainerLocationClient.forceExitSafeMode();
}
@Override
public void startReplicationManager() throws IOException {
storageContainerLocationClient.startReplicationManager();
}
@Override
public void stopReplicationManager() throws IOException {
storageContainerLocationClient.stopReplicationManager();
}
@Override
public boolean getReplicationManagerStatus() throws IOException {
return storageContainerLocationClient.getReplicationManagerStatus();
}
}