blob: d0ba60d9ad7e880edc64456ce8a8e0255a161654 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.CloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetSmallFileResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
PutSmallFileResponseProto;
import org.apache.hadoop.hdds.client.BlockID;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* Implementation of all container protocol calls performed by Container
* clients.
*/
public final class ContainerProtocolCalls {
/**
* There is no need to instantiate this class.
*/
private ContainerProtocolCalls() {
}
/**
* Calls the container protocol to get a container block.
*
* @param xceiverClient client to perform call
* @param datanodeBlockID blockID to identify container
* @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call
*/
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID) throws IOException {
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder()
.setBlockID(datanodeBlockID);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setDatanodeUuid(id)
.setGetBlock(readBlockRequest);
String encodedToken = getEncodedBlockToken(getService(datanodeBlockID));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
return response.getGetBlock();
}
/**
* Calls the container protocol to get the length of a committed block.
*
* @param xceiverClient client to perform call
* @param blockID blockId for the Block
* @return container protocol getLastCommittedBlockLength response
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.GetCommittedBlockLengthResponseProto
getCommittedBlockLength(
XceiverClientSpi xceiverClient, BlockID blockID)
throws IOException {
ContainerProtos.GetCommittedBlockLengthRequestProto.Builder
getBlockLengthRequestBuilder =
ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder().
setBlockID(blockID.getDatanodeBlockIDProtobuf());
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.GetCommittedBlockLength)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
.setGetCommittedBlockLength(getBlockLengthRequestBuilder);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
return response.getGetCommittedBlockLength();
}
/**
* Calls the container protocol to put a container block.
*
* @param xceiverClient client to perform call
* @param containerBlockData block data to identify container
* @return putBlockResponse
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.PutBlockResponseProto putBlock(
XceiverClientSpi xceiverClient, BlockData containerBlockData)
throws IOException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
return response.getPutBlock();
}
/**
* Calls the container protocol to put a container block.
*
* @param xceiverClient client to perform call
* @param containerBlockData block data to identify container
* @return putBlockResponse
* @throws IOException if there is an error while performing the call
* @throws InterruptedException
* @throws ExecutionException
*/
public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData)
throws IOException, InterruptedException, ExecutionException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
return xceiverClient.sendCommandAsync(request);
}
/**
* Calls the container protocol to read a chunk.
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to read
* @param blockID ID of the block
* @param validators functions to validate the response
* @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.ReadChunkResponseProto readChunk(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
List<CheckedBiFunction> validators) throws IOException {
ReadChunkRequestProto.Builder readChunkRequest =
ReadChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk);
String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setReadChunk(readChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto reply =
xceiverClient.sendCommand(request, validators);
return reply.getReadChunk();
}
/**
* Calls the container protocol to write a chunk.
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to write
* @param blockID ID of the block
* @param data the data of the chunk to write
* @throws IOException if there is an error while performing the call
*/
public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
BlockID blockID, ByteString data)
throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(data);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
.setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
xceiverClient.sendCommand(request, getValidatorList());
}
/**
* Calls the container protocol to write a chunk.
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to write
* @param blockID ID of the block
* @param data the data of the chunk to write
* @throws IOException if there is an I/O error while performing the call
*/
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data)
throws IOException, ExecutionException, InterruptedException {
WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk).setData(data);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
return xceiverClient.sendCommandAsync(request);
}
/**
* Allows writing a small file using single RPC. This takes the container
* name, block name and data to write sends all that data to the container
* using a single RPC. This API is designed to be used for files which are
* smaller than 1 MB.
*
* @param client - client that communicates with the container.
* @param blockID - ID of the block
* @param data - Data to be written into the container.
* @return container protocol writeSmallFile response
* @throws IOException
*/
public static PutSmallFileResponseProto writeSmallFile(
XceiverClientSpi client, BlockID blockID, byte[] data)
throws IOException {
BlockData containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.build();
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
.setBlockData(containerBlockData);
KeyValue keyValue =
KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
.build();
Checksum checksum = new Checksum();
ChecksumData checksumData = checksum.computeChecksum(data, 0, data.length);
ChunkInfo chunk =
ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk")
.setOffset(0)
.setLen(data.length)
.addMetadata(keyValue)
.setChecksumData(checksumData.getProtoBufMessage())
.build();
PutSmallFileRequestProto putSmallFileRequest =
PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk)
.setBlock(createBlockRequest).setData(ByteString.copyFrom(data))
.build();
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
.setPutSmallFile(putSmallFileRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
return response.getPutSmallFile();
}
/**
* createContainer call that creates a container on the datanode.
* @param client - client
* @param containerID - ID of container
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String encodedToken) throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
createRequest.setContainerType(ContainerProtos.ContainerType
.KeyValueContainer);
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(createRequest.build());
request.setDatanodeUuid(id);
client.sendCommand(request.build(), getValidatorList());
}
/**
* Deletes a container from a pipeline.
*
* @param client
* @param force whether or not to forcibly delete the container.
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void deleteContainer(XceiverClientSpi client, long containerID,
boolean force, String encodedToken) throws IOException {
ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest =
ContainerProtos.DeleteContainerRequestProto.newBuilder();
deleteRequest.setForceDelete(force);
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.DeleteContainer);
request.setContainerID(containerID);
request.setDeleteContainer(deleteRequest);
request.setDatanodeUuid(id);
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
}
client.sendCommand(request.build(), getValidatorList());
}
/**
* Close a container.
*
* @param client
* @param containerID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void closeContainer(XceiverClientSpi client,
long containerID, String encodedToken)
throws IOException {
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(CloseContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(id);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
client.sendCommand(request.build(), getValidatorList());
}
/**
* readContainer call that gets meta data from an existing container.
*
* @param client - client
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static ReadContainerResponseProto readContainer(
XceiverClientSpi client, long containerID, String encodedToken)
throws IOException {
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(Type.ReadContainer);
request.setContainerID(containerID);
request.setReadContainer(ReadContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(id);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
ContainerCommandResponseProto response =
client.sendCommand(request.build(), getValidatorList());
return response.getReadContainer();
}
/**
* Reads the data given the blockID.
*
* @param client
* @param blockID - ID of the block
* @return GetSmallFileResponseProto
* @throws IOException
*/
public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
BlockID blockID) throws IOException {
GetBlockRequestProto.Builder getBlock = GetBlockRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
GetSmallFileRequestProto
.newBuilder().setBlock(getBlock)
.build();
String id = client.getPipeline().getClosestNode().getUuidString();
ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
.setGetSmallFile(getSmallFileRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
return response.getGetSmallFile();
}
/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
*
* @param response container protocol call response
* @throws StorageContainerException if the container protocol call failed
*/
public static void validateContainerResponse(
ContainerCommandResponseProto response
) throws StorageContainerException {
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
return;
} else if (response.getResult()
== ContainerProtos.Result.BLOCK_NOT_COMMITTED) {
throw new BlockNotCommittedException(response.getMessage());
} else if (response.getResult()
== ContainerProtos.Result.CLOSED_CONTAINER_IO) {
throw new ContainerNotOpenException(response.getMessage());
}
throw new StorageContainerException(
response.getMessage(), response.getResult());
}
/**
* Returns a url encoded block token. Service param should match the service
* field of token.
* @param service
*
* */
private static String getEncodedBlockToken(Text service)
throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Token<OzoneBlockTokenIdentifier> token =
OzoneBlockTokenSelector.selectBlockToken(service, ugi.getTokens());
if (token != null) {
return token.encodeToUrlString();
}
return null;
}
private static Text getService(DatanodeBlockID blockId) {
return new Text(new StringBuffer()
.append("conID: ")
.append(blockId.getContainerID())
.append(" locID: ")
.append(blockId.getLocalID())
.toString());
}
public static List<CheckedBiFunction> getValidatorList() {
List<CheckedBiFunction> validators = new ArrayList<>(1);
CheckedBiFunction<ContainerProtos.ContainerCommandRequestProto,
ContainerProtos.ContainerCommandResponseProto, IOException>
validator = (request, response) -> validateContainerResponse(response);
validators.add(validator);
return validators;
}
}