blob: fe27eeb02d6b1a5b91922d7aa88628d0e09b1f36 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto
.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto
.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test-cases to verify the functionality of HddsDispatcher.
*/
public class TestHddsDispatcher {
@Test
public void testContainerCloseActionWhenFull() throws IOException {
String testDir = GenericTestUtils.getTempPath(
TestHddsDispatcher.class.getSimpleName());
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDir);
DatanodeDetails dd = randomDatanodeDetails();
VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
try {
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = new ContainerSet();
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(dd);
Mockito.when(context.getParent()).thenReturn(stateMachine);
KeyValueContainerData containerData = new KeyValueContainerData(1L,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
dd.getUuidString());
Container container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
containerSet.addContainer(container);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics);
hddsDispatcher.setScmId(scmId.toString());
ContainerCommandResponseProto responseOne = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
responseOne.getResult());
verify(context, times(0))
.addContainerActionIfAbsent(Mockito.any(ContainerAction.class));
containerData.setBytesUsed(Double.valueOf(
StorageUnit.MB.toBytes(950)).longValue());
ContainerCommandResponseProto responseTwo = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
responseTwo.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(Mockito.any(ContainerAction.class));
} finally {
volumeSet.shutdown();
FileUtils.deleteDirectory(new File(testDir));
}
}
@Test
public void testCreateContainerWithWriteChunk() throws IOException {
String testDir =
GenericTestUtils.getTempPath(TestHddsDispatcher.class.getSimpleName());
try {
UUID scmId = UUID.randomUUID();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDir);
DatanodeDetails dd = randomDatanodeDetails();
HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
ContainerCommandRequestProto writeChunkRequest =
getWriteChunkRequest(dd.getUuidString(), 1L, 1L);
// send read chunk request and make sure container does not exist
ContainerCommandResponseProto response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null);
Assert.assertEquals(response.getResult(),
ContainerProtos.Result.CONTAINER_NOT_FOUND);
// send write chunk request without sending create container
response = hddsDispatcher.dispatch(writeChunkRequest, null);
// container should be created as part of write chunk request
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
// send read chunk request to read the chunk written above
response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(response.getReadChunk().getData(),
writeChunkRequest.getWriteChunk().getData());
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}
@Test
public void testWriteChunkWithCreateContainerFailure() throws IOException {
String testDir = GenericTestUtils.getTempPath(
TestHddsDispatcher.class.getSimpleName());
try {
UUID scmId = UUID.randomUUID();
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HDDS_DATANODE_DIR_KEY, testDir);
DatanodeDetails dd = randomDatanodeDetails();
HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
ContainerCommandRequestProto writeChunkRequest = getWriteChunkRequest(
dd.getUuidString(), 1L, 1L);
HddsDispatcher mockDispatcher = Mockito.spy(hddsDispatcher);
ContainerCommandResponseProto.Builder builder = ContainerUtils
.getContainerCommandResponse(writeChunkRequest,
ContainerProtos.Result.DISK_OUT_OF_SPACE, "");
// Return DISK_OUT_OF_SPACE response when writing chunk
// with container creation.
Mockito.doReturn(builder.build()).when(mockDispatcher)
.createContainer(writeChunkRequest);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(HddsDispatcher.LOG);
// send write chunk request without sending create container
mockDispatcher.dispatch(writeChunkRequest, null);
// verify the error log
assertTrue(logCapturer.getOutput()
.contains("ContainerID " + writeChunkRequest.getContainerID()
+ " creation failed : Result: DISK_OUT_OF_SPACE"));
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}
/**
* Creates HddsDispatcher instance with given infos.
* @param dd datanode detail info.
* @param scmId UUID of scm id.
* @param conf configuration be used.
* @return HddsDispatcher HddsDispatcher instance.
* @throws IOException
*/
private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
OzoneConfiguration conf) throws IOException {
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(dd);
Mockito.when(context.getParent()).thenReturn(stateMachine);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics);
hddsDispatcher.setScmId(scmId.toString());
return hddsDispatcher;
}
// This method has to be removed once we move scm/TestUtils.java
// from server-scm project to container-service or to common project.
private static DatanodeDetails randomDatanodeDetails() {
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE, 0);
DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.RATIS, 0);
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.REST, 0);
DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
builder.setUuid(UUID.randomUUID().toString())
.setHostName("localhost")
.setIpAddress("127.0.0.1")
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort);
return builder.build();
}
private ContainerCommandRequestProto getWriteChunkRequest(
String datanodeId, Long containerId, Long localId) {
ByteString data = ByteString.copyFrom(
UUID.randomUUID().toString().getBytes(UTF_8));
ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo
.newBuilder()
.setChunkName(
DigestUtils.md5Hex("dummy-key") + "_stream_"
+ containerId + "_chunk_" + localId)
.setOffset(0)
.setLen(data.size())
.setChecksumData(Checksum.getNoChecksumDataProto())
.build();
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(new BlockID(containerId, localId)
.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(data);
return ContainerCommandRequestProto
.newBuilder()
.setContainerID(containerId)
.setCmdType(ContainerProtos.Type.WriteChunk)
.setDatanodeUuid(datanodeId)
.setWriteChunk(writeChunkRequest)
.build();
}
/**
* Creates container read chunk request using input container write chunk
* request.
*
* @param writeChunkRequest - Input container write chunk request
* @return container read chunk request
*/
private ContainerCommandRequestProto getReadChunkRequest(
ContainerCommandRequestProto writeChunkRequest) {
WriteChunkRequestProto writeChunk = writeChunkRequest.getWriteChunk();
ContainerProtos.ReadChunkRequestProto.Builder readChunkRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder()
.setBlockID(writeChunk.getBlockID())
.setChunkData(writeChunk.getChunkData());
return ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.ReadChunk)
.setContainerID(writeChunk.getBlockID().getContainerID())
.setTraceID(writeChunkRequest.getTraceID())
.setDatanodeUuid(writeChunkRequest.getDatanodeUuid())
.setReadChunk(readChunkRequest)
.build();
}
}