blob: 09fff2371eac93dc13103bf0f0fc3e8a36f320ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Preconditions;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helpers for container tests.
*/
public final class ContainerTestHelper {
private static final Logger LOG = LoggerFactory.getLogger(
ContainerTestHelper.class);
public static final long CONTAINER_MAX_SIZE =
(long) StorageUnit.GB.toBytes(1);
public static final String DATANODE_UUID = UUID.randomUUID().toString();
private static final long DUMMY_CONTAINER_ID = 9999;
/**
* Never constructed.
*/
private ContainerTestHelper() {
}
// TODO: mock multi-node pipeline
/**
* Creates a ChunkInfo for testing.
*
* @param keyID - ID of the key
* @param seqNo - Chunk number.
* @return ChunkInfo
*/
public static ChunkInfo getChunk(long keyID, int seqNo, long offset,
long len) {
return new ChunkInfo(String.format("%d.data.%d", keyID,
seqNo), offset, len);
}
/**
* Generates some data of the requested len.
*
* @param len - Number of bytes.
* @return byte array with valid data.
*/
public static ChunkBuffer getData(int len) {
byte[] data = new byte[len];
ThreadLocalRandom.current().nextBytes(data);
return ChunkBuffer.wrap(ByteBuffer.wrap(data));
}
/**
* Computes the hash and sets the value correctly.
*
* @param info - chunk info.
* @param data - data array
*/
public static void setDataChecksum(ChunkInfo info, ChunkBuffer data)
throws OzoneChecksumException {
Checksum checksum = new Checksum(ChecksumType.CRC32,
1024 * 1024);
info.setChecksumData(checksum.computeChecksum(data));
data.rewind();
}
/**
* Returns a writeChunk Request.
*
* @param pipeline - A set of machines where this container lives.
* @param blockID - Block ID of the chunk.
* @param datalen - Length of data.
* @return ContainerCommandRequestProto
*/
public static ContainerCommandRequestProto getWriteChunkRequest(
Pipeline pipeline, BlockID blockID, int datalen)
throws IOException {
LOG.trace("writeChunk {} (blockID={}) to pipeline={}",
datalen, blockID, pipeline);
return newWriteChunkRequestBuilder(pipeline, blockID, datalen)
.build();
}
public static ContainerCommandRequestProto getListBlockRequest(
ContainerCommandRequestProto writeChunkRequest) {
return ContainerCommandRequestProto.newBuilder()
.setContainerID(writeChunkRequest.getContainerID())
.setCmdType(ContainerProtos.Type.ListBlock)
.setDatanodeUuid(writeChunkRequest.getDatanodeUuid())
.setListBlock(ContainerProtos.ListBlockRequestProto.newBuilder()
.setCount(10).build())
.build();
}
public static ContainerCommandRequestProto getPutBlockRequest(
ContainerCommandRequestProto writeChunkRequest) {
ContainerProtos.BlockData.Builder block =
ContainerProtos.BlockData.newBuilder()
.setSize(writeChunkRequest.getWriteChunk().getChunkData().getLen())
.setBlockID(writeChunkRequest.getWriteChunk().getBlockID())
.addChunks(writeChunkRequest.getWriteChunk().getChunkData());
return ContainerCommandRequestProto.newBuilder()
.setContainerID(writeChunkRequest.getContainerID())
.setCmdType(ContainerProtos.Type.PutBlock)
.setDatanodeUuid(writeChunkRequest.getDatanodeUuid())
.setPutBlock(ContainerProtos.PutBlockRequestProto.newBuilder()
.setBlockData(block.build())
.build())
.build();
}
public static Builder newWriteChunkRequestBuilder(Pipeline pipeline,
BlockID blockID, int datalen) throws IOException {
ChunkBuffer data = getData(datalen);
return newWriteChunkRequestBuilder(pipeline, blockID, data, 0);
}
public static Builder newWriteChunkRequestBuilder(
Pipeline pipeline, BlockID blockID, ChunkBuffer data, int seq)
throws IOException {
LOG.trace("writeChunk {} (blockID={}) to pipeline={}",
data.limit(), blockID, pipeline);
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
ContainerProtos.WriteChunkRequestProto
.newBuilder();
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, data.limit());
setDataChecksum(info, data);
writeRequest.setChunkData(info.getProtoBufMessage());
writeRequest.setData(data.toByteString());
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.WriteChunk);
request.setContainerID(blockID.getContainerID());
request.setWriteChunk(writeRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request;
}
/**
* Returns PutSmallFile Request that we can send to the container.
*
* @param pipeline - Pipeline
* @param blockID - Block ID of the small file.
* @param dataLen - Number of bytes in the data
* @return ContainerCommandRequestProto
*/
public static ContainerCommandRequestProto getWriteSmallFileRequest(
Pipeline pipeline, BlockID blockID, int dataLen)
throws Exception {
ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.PutSmallFileRequestProto.newBuilder();
ChunkBuffer data = getData(dataLen);
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, dataLen);
setDataChecksum(info, data);
ContainerProtos.PutBlockRequestProto.Builder putRequest =
ContainerProtos.PutBlockRequestProto.newBuilder();
BlockData blockData = new BlockData(blockID);
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
newList.add(info.getProtoBufMessage());
blockData.setChunks(newList);
putRequest.setBlockData(blockData.getProtoBufMessage());
smallFileRequest.setChunkInfo(info.getProtoBufMessage());
smallFileRequest.setData(data.toByteString());
smallFileRequest.setBlock(putRequest);
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutSmallFile);
request.setContainerID(blockID.getContainerID());
request.setPutSmallFile(smallFileRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request.build();
}
public static ContainerCommandRequestProto getReadSmallFileRequest(
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putKey)
throws Exception {
ContainerProtos.GetSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.GetSmallFileRequestProto.newBuilder();
ContainerCommandRequestProto getKey = getBlockRequest(pipeline, putKey);
smallFileRequest.setBlock(getKey.getGetBlock());
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.GetSmallFile);
request.setContainerID(getKey.getGetBlock().getBlockID().getContainerID());
request.setGetSmallFile(smallFileRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request.build();
}
/**
* Returns a read Request.
*
* @param pipeline pipeline.
* @param writeChunk writeChunkRequest.
* @return Request.
*/
public static ContainerCommandRequestProto getReadChunkRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeChunk)
throws IOException {
return newReadChunkRequestBuilder(pipeline, writeChunk).build();
}
public static Builder newReadChunkRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeChunk)
throws IOException {
LOG.trace("readChunk blockID={} from pipeline={}",
writeChunk.getBlockID(), pipeline);
ContainerProtos.ReadChunkRequestProto.Builder readRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder();
readRequest.setBlockID(writeChunk.getBlockID());
readRequest.setChunkData(writeChunk.getChunkData());
readRequest.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
Builder newRequest =
ContainerCommandRequestProto.newBuilder();
newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
newRequest.setContainerID(readRequest.getBlockID().getContainerID());
newRequest.setReadChunk(readRequest);
newRequest.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return newRequest;
}
/**
* Returns a create container command for test purposes. There are a bunch of
* tests where we need to just send a request and get a reply.
*
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
long containerID, Pipeline pipeline) throws IOException {
LOG.trace("addContainer: {}", containerID);
return getContainerCommandRequestBuilder(containerID, pipeline).build();
}
private static Builder getContainerCommandRequestBuilder(long containerID,
Pipeline pipeline) throws IOException {
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(
ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request;
}
/**
* Returns a create container command for test purposes. There are a bunch of
* tests where we need to just send a request and get a reply.
*
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCreateContainerSecureRequest(
long containerID, Pipeline pipeline, Token<?> token) throws IOException {
LOG.trace("addContainer: {}", containerID);
Builder request = getContainerCommandRequestBuilder(containerID, pipeline);
if (token != null) {
request.setEncodedToken(token.encodeToUrlString());
}
return request.build();
}
/**
* Return an update container command for test purposes.
* Creates a container data based on the given meta data,
* and request to update an existing container with it.
*/
public static ContainerCommandRequestProto getUpdateContainerRequest(
long containerID, Map<String, String> metaData) throws IOException {
ContainerProtos.UpdateContainerRequestProto.Builder updateRequestBuilder =
ContainerProtos.UpdateContainerRequestProto.newBuilder();
String[] keys = metaData.keySet().toArray(new String[]{});
for (String key : keys) {
KeyValue.Builder kvBuilder = KeyValue.newBuilder();
kvBuilder.setKey(key);
kvBuilder.setValue(metaData.get(key));
updateRequestBuilder.addMetadata(kvBuilder.build());
}
Pipeline pipeline =
MockPipeline.createSingleNodePipeline();
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.UpdateContainer);
request.setContainerID(containerID);
request.setUpdateContainer(updateRequestBuilder.build());
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request.build();
}
/**
* Returns a create container response for test purposes. There are a bunch of
* tests where we need to just send a request and get a reply.
*
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandResponseProto
getCreateContainerResponse(ContainerCommandRequestProto request) {
ContainerCommandResponseProto.Builder response =
ContainerCommandResponseProto.newBuilder();
response.setCmdType(ContainerProtos.Type.CreateContainer);
response.setTraceID(request.getTraceID());
response.setCreateContainer(
ContainerProtos.CreateContainerResponseProto.getDefaultInstance());
response.setResult(ContainerProtos.Result.SUCCESS);
return response.build();
}
/**
* Returns the PutBlockRequest for test purpose.
* @param pipeline - pipeline.
* @param writeRequest - Write Chunk Request.
* @return - Request
*/
public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, ContainerProtos.WriteChunkRequestProto writeRequest)
throws IOException {
return newPutBlockRequestBuilder(pipeline, writeRequest).build();
}
public static Builder newPutBlockRequestBuilder(Pipeline pipeline,
ContainerProtos.WriteChunkRequestProtoOrBuilder writeRequest)
throws IOException {
LOG.trace("putBlock: {} to pipeline={}",
writeRequest.getBlockID(), pipeline);
ContainerProtos.PutBlockRequestProto.Builder putRequest =
ContainerProtos.PutBlockRequestProto.newBuilder();
BlockData blockData = new BlockData(
BlockID.getFromProtobuf(writeRequest.getBlockID()));
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
newList.add(writeRequest.getChunkData());
blockData.setChunks(newList);
blockData.setBlockCommitSequenceId(0);
putRequest.setBlockData(blockData.getProtoBufMessage());
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.PutBlock);
request.setContainerID(blockData.getContainerID());
request.setPutBlock(putRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request;
}
/**
* Gets a GetBlockRequest for test purpose.
* @param pipeline - pipeline
* @param putBlockRequest - putBlockRequest.
* @return - Request
* immediately.
*/
public static ContainerCommandRequestProto getBlockRequest(
Pipeline pipeline, ContainerProtos.PutBlockRequestProto putBlockRequest)
throws IOException {
return newGetBlockRequestBuilder(pipeline, putBlockRequest).build();
}
public static Builder newGetBlockRequestBuilder(
Pipeline pipeline, ContainerProtos.PutBlockRequestProtoOrBuilder putBlock)
throws IOException {
DatanodeBlockID blockID = putBlock.getBlockData().getBlockID();
ContainerProtos.GetBlockRequestProto.Builder getRequest =
ContainerProtos.GetBlockRequestProto.newBuilder();
getRequest.setBlockID(blockID);
Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.GetBlock);
request.setContainerID(blockID.getContainerID());
request.setGetBlock(getRequest);
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
return request;
}
/**
* Verify the response against the request.
*
* @param request - Request
* @param response - Response
*/
public static void verifyGetBlock(ContainerCommandRequestProto request,
ContainerCommandResponseProto response, int expectedChunksCount) {
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(expectedChunksCount,
response.getGetBlock().getBlockData().getChunksCount());
}
public static Builder newGetCommittedBlockLengthBuilder(Pipeline pipeline,
ContainerProtos.PutBlockRequestProtoOrBuilder putBlock)
throws IOException {
DatanodeBlockID blockID = putBlock.getBlockData().getBlockID();
ContainerProtos.GetCommittedBlockLengthRequestProto.Builder req =
ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder()
.setBlockID(blockID);
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.GetCommittedBlockLength)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(pipeline.getFirstNode().getUuidString())
.setGetCommittedBlockLength(req);
}
/**
* Returns a close container request.
* @param pipeline - pipeline
* @param containerID - ID of the container.
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCloseContainer(
Pipeline pipeline, long containerID) throws IOException {
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CloseContainer)
.setContainerID(containerID)
.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance())
.setDatanodeUuid(pipeline.getFirstNode().getUuidString())
.build();
}
/**
* Returns a simple request without traceId.
* @param pipeline - pipeline
* @param containerID - ID of the container.
* @return ContainerCommandRequestProto without traceId.
*/
public static ContainerCommandRequestProto getRequestWithoutTraceId(
Pipeline pipeline, long containerID) throws IOException {
Preconditions.checkNotNull(pipeline);
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CloseContainer)
.setContainerID(containerID)
.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance())
.setDatanodeUuid(pipeline.getFirstNode().getUuidString())
.build();
}
/**
* Returns a delete container request.
* @param pipeline - pipeline
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getDeleteContainer(
Pipeline pipeline, long containerID, boolean forceDelete)
throws IOException {
Preconditions.checkNotNull(pipeline);
ContainerProtos.DeleteContainerRequestProto deleteRequest =
ContainerProtos.DeleteContainerRequestProto.newBuilder().
setForceDelete(forceDelete).build();
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.DeleteContainer)
.setContainerID(containerID)
.setDeleteContainer(
ContainerProtos.DeleteContainerRequestProto.getDefaultInstance())
.setDeleteContainer(deleteRequest)
.setDatanodeUuid(pipeline.getFirstNode().getUuidString())
.build();
}
private static void sleep(long milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static BlockID getTestBlockID(long containerID) {
// Add 2ms delay so that localID based on UtcTime
// won't collide.
sleep(2);
return new BlockID(containerID, HddsUtils.getTime());
}
public static long getTestContainerID() {
return HddsUtils.getTime();
}
public static String getFixedLengthString(String string, int length) {
return String.format("%1$" + length + "s", string);
}
/**
* Construct fake protobuf messages for various types of requests.
* This is tedious, however necessary to test. Protobuf classes are final
* and cannot be mocked by Mockito.
*
* @param cmdType type of the container command.
* @return
*/
public static ContainerCommandRequestProto getDummyCommandRequestProto(
ContainerProtos.Type cmdType) {
final Builder builder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(cmdType)
.setContainerID(DUMMY_CONTAINER_ID)
.setDatanodeUuid(DATANODE_UUID);
final DatanodeBlockID fakeBlockId =
DatanodeBlockID.newBuilder()
.setContainerID(DUMMY_CONTAINER_ID).setLocalID(1)
.setBlockCommitSequenceId(101).build();
final ContainerProtos.ChunkInfo fakeChunkInfo =
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName("dummy")
.setOffset(0)
.setLen(100)
.setChecksumData(ContainerProtos.ChecksumData.newBuilder()
.setBytesPerChecksum(1)
.setType(ChecksumType.CRC32)
.build())
.build();
switch (cmdType) {
case ReadContainer:
builder.setReadContainer(
ContainerProtos.ReadContainerRequestProto.newBuilder().build());
break;
case GetBlock:
builder.setGetBlock(ContainerProtos.GetBlockRequestProto.newBuilder()
.setBlockID(fakeBlockId).build());
break;
case GetCommittedBlockLength:
builder.setGetCommittedBlockLength(
ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder()
.setBlockID(fakeBlockId).build());
break;
case ReadChunk:
builder.setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder()
.setBlockID(fakeBlockId).setChunkData(fakeChunkInfo)
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1).build());
break;
case GetSmallFile:
builder
.setGetSmallFile(ContainerProtos.GetSmallFileRequestProto.newBuilder()
.setBlock(ContainerProtos.GetBlockRequestProto.newBuilder()
.setBlockID(fakeBlockId)
.build())
.build());
break;
default:
Assert.fail("Unhandled request type " + cmdType + " in unit test");
}
return builder.build();
}
}