blob: a262bb5bdbdd921e1d13dc995be7bf1efe688145 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.OK;
/**
* This class is the client-side translator to translate the requests made on
* the {@link ScmBlockLocationProtocol} interface to the RPC server
* implementing {@link ScmBlockLocationProtocolPB}.
*/
@InterfaceAudience.Private
public final class ScmBlockLocationProtocolClientSideTranslatorPB
implements ScmBlockLocationProtocol, ProtocolTranslator, Closeable {
/**
* RpcController is not used and hence is set to null.
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
private final ScmBlockLocationProtocolPB rpcProxy;
/**
* Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
*
* @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
*/
public ScmBlockLocationProtocolClientSideTranslatorPB(
ScmBlockLocationProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
/**
* Returns a SCMBlockLocationRequest builder with specified type.
* @param cmdType type of the request
*/
private SCMBlockLocationRequest.Builder createSCMBlockRequest(Type cmdType) {
return SCMBlockLocationRequest.newBuilder()
.setCmdType(cmdType)
.setTraceID(TracingUtil.exportCurrentSpan());
}
/**
* Submits client request to SCM server.
* @param req client request
* @return response from SCM
* @throws IOException thrown if any Protobuf service exception occurs
*/
private SCMBlockLocationResponse submitRequest(
SCMBlockLocationRequest req) throws IOException {
try {
SCMBlockLocationResponse response =
rpcProxy.send(NULL_RPC_CONTROLLER, req);
return response;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private SCMBlockLocationResponse handleError(SCMBlockLocationResponse resp)
throws SCMException {
if (resp.getStatus() != OK) {
throw new SCMException(resp.getMessage(),
SCMException.ResultCodes.values()[resp.getStatus().ordinal()]);
}
return resp;
}
/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
* @param num - number of blocks.
* @param type - replication type of the blocks.
* @param factor - replication factor of the blocks.
* @param excludeList - exclude list while allocating blocks.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
@Override
public List<AllocatedBlock> allocateBlock(long size, int num,
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
String owner, ExcludeList excludeList) throws IOException {
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
AllocateScmBlockRequestProto request =
AllocateScmBlockRequestProto.newBuilder()
.setSize(size)
.setNumBlocks(num)
.setType(type)
.setFactor(factor)
.setOwner(owner)
.setExcludeList(excludeList.getProtoBuf())
.build();
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.AllocateScmBlock)
.setAllocateScmBlockRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final AllocateScmBlockResponseProto response =
wrappedResponse.getAllocateScmBlockResponse();
List<AllocatedBlock> blocks = new ArrayList<>(response.getBlocksCount());
for (AllocateBlockResponse resp : response.getBlocksList()) {
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
.setContainerBlockID(
ContainerBlockID.getFromProtobuf(resp.getContainerBlockID()))
.setPipeline(Pipeline.getFromProtobuf(resp.getPipeline()));
blocks.add(builder.build());
}
return blocks;
}
/**
* Delete the set of keys specified.
*
* @param keyBlocksInfoList batch of block keys to delete.
* @return list of block deletion results.
* @throws IOException if there is any failure.
*
*/
@Override
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
.map(BlockGroup::getProto).collect(Collectors.toList());
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
.newBuilder()
.addAllKeyBlocks(keyBlocksProto)
.build();
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.DeleteScmKeyBlocks)
.setDeleteScmKeyBlocksRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final DeleteScmKeyBlocksResponseProto resp =
wrappedResponse.getDeleteScmKeyBlocksResponse();
List<DeleteBlockGroupResult> results =
new ArrayList<>(resp.getResultsCount());
results.addAll(resp.getResultsList().stream().map(
result -> new DeleteBlockGroupResult(result.getObjectKey(),
DeleteBlockGroupResult
.convertBlockResultProto(result.getBlockResultsList())))
.collect(Collectors.toList()));
return results;
}
/**
* Gets the cluster Id and Scm Id from SCM.
* @return ScmInfo
* @throws IOException
*/
@Override
public ScmInfo getScmInfo() throws IOException {
HddsProtos.GetScmInfoRequestProto request =
HddsProtos.GetScmInfoRequestProto.getDefaultInstance();
HddsProtos.GetScmInfoResponseProto resp;
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.GetScmInfo)
.setGetScmInfoRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
resp = wrappedResponse.getGetScmInfoResponse();
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
.setScmId(resp.getScmId());
return builder.build();
}
/**
* Sort the datanodes based on distance from client.
* @return List<DatanodeDetails></>
* @throws IOException
*/
@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
SortDatanodesRequestProto request = SortDatanodesRequestProto
.newBuilder()
.addAllNodeNetworkName(nodes)
.setClient(clientMachine)
.build();
SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.SortDatanodes)
.setSortDatanodesRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
SortDatanodesResponseProto resp =
wrappedResponse.getSortDatanodesResponse();
List<DatanodeDetails> results = new ArrayList<>(resp.getNodeCount());
results.addAll(resp.getNodeList().stream()
.map(node -> DatanodeDetails.getFromProtoBuf(node))
.collect(Collectors.toList()));
return results;
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
}