blob: 8f30a7fad10f0491f1f939d72234d79a10156baa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.client;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdds.protocol.proto.ContainerProtos
.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState
.ALLOCATED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState
.OPEN;
/**
* This class provides the client-facing APIs of container operations.
*/
public class ContainerOperationClient implements ScmClient {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerOperationClient.class);
private static long containerSizeB = -1;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private final XceiverClientManager xceiverClientManager;
public ContainerOperationClient(
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient,
XceiverClientManager xceiverClientManager) {
this.storageContainerLocationClient = storageContainerLocationClient;
this.xceiverClientManager = xceiverClientManager;
}
/**
* Return the capacity of containers. The current assumption is that all
* containers have the same capacity. Therefore one static is sufficient for
* any container.
* @return The capacity of one container in number of bytes.
*/
public static long getContainerSizeB() {
return containerSizeB;
}
/**
* Set the capacity of container. Should be exactly once on system start.
* @param size Capacity of one container in number of bytes.
*/
public static void setContainerSizeB(long size) {
containerSizeB = size;
}
/**
* @inheritDoc
*/
@Override
public Pipeline createContainer(String containerId, String owner)
throws IOException {
XceiverClientSpi client = null;
try {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerId, owner);
client = xceiverClientManager.acquireClient(pipeline);
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
// which was choosen by the SCM.
Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED ||
pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state");
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
// TODO : Container Client State needs to be updated.
// TODO : Return ContainerInfo instead of Pipeline
createContainer(containerId, client, pipeline);
return pipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* Create a container over pipeline specified by the SCM.
*
* @param containerId - Container ID
* @param client - Client to communicate with Datanodes
* @param pipeline - A pipeline that is already created.
* @throws IOException
*/
public void createContainer(String containerId, XceiverClientSpi client,
Pipeline pipeline) throws IOException {
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.createContainer(client, traceID);
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
// Let us log this info after we let SCM know that we have completed the
// creation state.
if (LOG.isDebugEnabled()) {
LOG.debug("Created container " + containerId
+ " leader:" + pipeline.getLeader()
+ " machines:" + pipeline.getMachines());
}
}
/**
* Creates a pipeline over the machines choosen by the SCM.
*
* @param client - Client
* @param pipeline - pipeline to be createdon Datanodes.
* @throws IOException
*/
private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
throws IOException {
Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " +
"name cannot be null when client create flag is set.");
// Pipeline creation is a three step process.
//
// 1. Notify SCM that this client is doing a create pipeline on
// datanodes.
//
// 2. Talk to Datanodes to create the pipeline.
//
// 3. update SCM that pipeline creation was successful.
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.pipeline,
pipeline.getPipelineName(),
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.begin);
client.createPipeline(pipeline.getPipelineName(),
pipeline.getMachines());
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.pipeline,
pipeline.getPipelineName(),
ObjectStageChangeRequestProto.Op.create,
ObjectStageChangeRequestProto.Stage.complete);
// TODO : Should we change the state on the client side ??
// That makes sense, but it is not needed for the client to work.
LOG.debug("Pipeline creation successful. Pipeline: {}",
pipeline.toString());
}
/**
* @inheritDoc
*/
@Override
public Pipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor,
String containerId, String owner) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(type, factor,
containerId, owner);
client = xceiverClientManager.acquireClient(pipeline);
// Allocated State means that SCM has allocated this pipeline in its
// namespace. The client needs to create the pipeline on the machines
// which was choosen by the SCM.
if (pipeline.getLifeCycleState() == ALLOCATED) {
createPipeline(client, pipeline);
}
// TODO : Return ContainerInfo instead of Pipeline
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
createContainer(containerId, client, pipeline);
return pipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* Returns a set of Nodes that meet a query criteria.
*
* @param nodeStatuses - A set of criteria that we want the node to have.
* @param queryScope - Query scope - Cluster or pool.
* @param poolName - if it is pool, a pool name is required.
* @return A set of nodes that meet the requested criteria.
* @throws IOException
*/
@Override
public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState>
nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
throws IOException {
return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
poolName);
}
/**
* Creates a specified replication pipeline.
*/
@Override
public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException {
return storageContainerLocationClient.createReplicationPipeline(type,
factor, nodePool);
}
/**
* Delete the container, this will release any resource it uses.
* @param pipeline - Pipeline that represents the container.
* @param force - True to forcibly delete the container.
* @throws IOException
*/
@Override
public void deleteContainer(Pipeline pipeline, boolean force)
throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.deleteContainer(client, force, traceID);
storageContainerLocationClient
.deleteContainer(pipeline.getContainerName());
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted container {}, leader: {}, machines: {} ",
pipeline.getContainerName(),
pipeline.getLeader(),
pipeline.getMachines());
}
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count)
throws IOException {
return storageContainerLocationClient.listContainer(
startName, prefixName, count);
}
/**
* Get meta data from an existing container.
*
* @param pipeline - pipeline that represents the container.
* @return ContainerInfo - a message of protobuf which has basic info
* of a container.
* @throws IOException
*/
@Override
public ContainerData readContainer(Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client,
pipeline.getContainerName(), traceID);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, leader: {}, machines: {} ",
pipeline.getContainerName(),
pipeline.getLeader(),
pipeline.getMachines());
}
return response.getContainerData();
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* Given an id, return the pipeline associated with the container.
* @param containerId - String Container ID
* @return Pipeline of the existing container, corresponding to the given id.
* @throws IOException
*/
@Override
public Pipeline getContainer(String containerId) throws
IOException {
return storageContainerLocationClient.getContainer(containerId);
}
/**
* Close a container.
*
* @param pipeline the container to be closed.
* @throws IOException
*/
@Override
public void closeContainer(Pipeline pipeline) throws IOException {
XceiverClientSpi client = null;
try {
LOG.debug("Close container {}", pipeline);
/*
TODO: two orders here, revisit this later:
1. close on SCM first, then on data node
2. close on data node first, then on SCM
with 1: if client failed after closing on SCM, then there is a
container SCM thinks as closed, but is actually open. Then SCM will no
longer allocate block to it, which is fine. But SCM may later try to
replicate this "closed" container, which I'm not sure is safe.
with 2: if client failed after close on datanode, then there is a
container SCM thinks as open, but is actually closed. Then SCM will still
try to allocate block to it. Which will fail when actually doing the
write. No more data can be written, but at least the correctness and
consistency of existing data will maintain.
For now, take the #2 way.
*/
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
String containerId = pipeline.getContainerName();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.closeContainer(client, traceID);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
containerId,
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.complete);
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* Get the the current usage information.
* @param pipeline - Pipeline
* @return the size of the given container.
* @throws IOException
*/
@Override
public long getContainerSize(Pipeline pipeline) throws IOException {
// TODO : Pipeline can be null, handle it correctly.
long size = getContainerSizeB();
if (size == -1) {
throw new IOException("Container size unknown!");
}
return size;
}
}