blob: c05ecb966bcd49eac42a15fa497a37c4fe480e46 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.genesis;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.GetBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
/**
* Benchmarks DatanodeDispatcher class.
*/
@State(Scope.Benchmark)
public class BenchMarkDatanodeDispatcher {
private String baseDir;
private String datanodeUuid;
private HddsDispatcher dispatcher;
private ByteString data;
private Random random;
private AtomicInteger containerCount;
private AtomicInteger keyCount;
private AtomicInteger chunkCount;
private static final int INIT_CONTAINERS = 100;
private static final int INIT_KEYS = 50;
private static final int INIT_CHUNKS = 100;
private List<Long> containers;
private List<Long> keys;
private List<String> chunks;
private VolumeSet volumeSet;
@Setup(Level.Trial)
public void initialize() throws IOException {
datanodeUuid = UUID.randomUUID().toString();
// 1 MB of data
data = ByteString.copyFromUtf8(RandomStringUtils.randomAscii(1048576));
random = new Random();
Configuration conf = new OzoneConfiguration();
baseDir = System.getProperty("java.io.tmpdir") + File.separator +
datanodeUuid;
// data directory
conf.set("dfs.datanode.data.dir", baseDir + File.separator + "data");
ContainerSet containerSet = new ContainerSet();
volumeSet = new VolumeSet(datanodeUuid, conf);
StateContext context = new StateContext(
conf, DatanodeStates.RUNNING, null);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, context, containerSet, volumeSet, metrics));
}
dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, handlers,
context, metrics);
dispatcher.init();
containerCount = new AtomicInteger();
keyCount = new AtomicInteger();
chunkCount = new AtomicInteger();
containers = new ArrayList<>();
keys = new ArrayList<>();
chunks = new ArrayList<>();
// Create containers
for (int x = 0; x < INIT_CONTAINERS; x++) {
long containerID = HddsUtils.getUtcTime() + x;
ContainerCommandRequestProto req = getCreateContainerCommand(containerID);
dispatcher.dispatch(req, null);
containers.add(containerID);
containerCount.getAndIncrement();
}
for (int x = 0; x < INIT_KEYS; x++) {
keys.add(HddsUtils.getUtcTime()+x);
}
for (int x = 0; x < INIT_CHUNKS; x++) {
chunks.add("chunk-" + x);
}
// Add chunk and keys to the containers
for (int x = 0; x < INIT_KEYS; x++) {
String chunkName = chunks.get(x);
chunkCount.getAndIncrement();
long key = keys.get(x);
keyCount.getAndIncrement();
for (int y = 0; y < INIT_CONTAINERS; y++) {
long containerID = containers.get(y);
BlockID blockID = new BlockID(containerID, key);
dispatcher
.dispatch(getPutBlockCommand(blockID, chunkName), null);
dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName), null);
}
}
}
@TearDown(Level.Trial)
public void cleanup() throws IOException {
volumeSet.shutdown();
FileUtils.deleteDirectory(new File(baseDir));
}
private ContainerCommandRequestProto getCreateContainerCommand(
long containerID) {
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setContainerID(containerID);
request.setCreateContainer(
ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(datanodeUuid);
request.setTraceID(containerID + "-trace");
return request.build();
}
private ContainerCommandRequestProto getWriteChunkCommand(
BlockID blockID, String chunkName) {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(getChunkInfo(blockID, chunkName))
.setData(data);
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder();
request.setCmdType(ContainerProtos.Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setTraceID(getBlockTraceID(blockID))
.setDatanodeUuid(datanodeUuid)
.setWriteChunk(writeChunkRequest);
return request.build();
}
private ContainerCommandRequestProto getReadChunkCommand(
BlockID blockID, String chunkName) {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(getChunkInfo(blockID, chunkName));
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder();
request.setCmdType(ContainerProtos.Type.ReadChunk)
.setContainerID(blockID.getContainerID())
.setTraceID(getBlockTraceID(blockID))
.setDatanodeUuid(datanodeUuid)
.setReadChunk(readChunkRequest);
return request.build();
}
private ContainerProtos.ChunkInfo getChunkInfo(
BlockID blockID, String chunkName) {
ContainerProtos.ChunkInfo.Builder builder =
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(
DigestUtils.md5Hex(chunkName)
+ "_stream_" + blockID.getContainerID() + "_block_"
+ blockID.getLocalID())
.setOffset(0).setLen(data.size());
return builder.build();
}
private ContainerCommandRequestProto getPutBlockCommand(
BlockID blockID, String chunkKey) {
PutBlockRequestProto.Builder putBlockRequest = PutBlockRequestProto
.newBuilder()
.setBlockData(getBlockData(blockID, chunkKey));
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder();
request.setCmdType(ContainerProtos.Type.PutBlock)
.setContainerID(blockID.getContainerID())
.setTraceID(getBlockTraceID(blockID))
.setDatanodeUuid(datanodeUuid)
.setPutBlock(putBlockRequest);
return request.build();
}
private ContainerCommandRequestProto getGetBlockCommand(BlockID blockID) {
GetBlockRequestProto.Builder readBlockRequest =
GetBlockRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(ContainerProtos.Type.GetBlock)
.setContainerID(blockID.getContainerID())
.setTraceID(getBlockTraceID(blockID))
.setDatanodeUuid(datanodeUuid)
.setGetBlock(readBlockRequest);
return request.build();
}
private ContainerProtos.BlockData getBlockData(
BlockID blockID, String chunkKey) {
ContainerProtos.BlockData.Builder builder = ContainerProtos.BlockData
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addChunks(getChunkInfo(blockID, chunkKey));
return builder.build();
}
@Benchmark
public void createContainer(BenchMarkDatanodeDispatcher bmdd) {
long containerID = RandomUtils.nextLong();
ContainerCommandRequestProto req = getCreateContainerCommand(containerID);
bmdd.dispatcher.dispatch(req, null);
bmdd.containers.add(containerID);
bmdd.containerCount.getAndIncrement();
}
@Benchmark
public void writeChunk(BenchMarkDatanodeDispatcher bmdd) {
bmdd.dispatcher.dispatch(getWriteChunkCommand(
getRandomBlockID(), getNewChunkToWrite()), null);
}
@Benchmark
public void readChunk(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID();
String chunkKey = getRandomChunkToRead();
bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey), null);
}
@Benchmark
public void putBlock(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID();
String chunkKey = getNewChunkToWrite();
bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey), null);
}
@Benchmark
public void getBlock(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID();
bmdd.dispatcher.dispatch(getGetBlockCommand(blockID), null);
}
// Chunks writes from benchmark only reaches certain containers
// Use INIT_CHUNKS instead of updated counters to guarantee
// key/chunks are readable.
private BlockID getRandomBlockID() {
return new BlockID(getRandomContainerID(), getRandomKeyID());
}
private long getRandomContainerID() {
return containers.get(random.nextInt(INIT_CONTAINERS));
}
private long getRandomKeyID() {
return keys.get(random.nextInt(INIT_KEYS));
}
private String getRandomChunkToRead() {
return chunks.get(random.nextInt(INIT_CHUNKS));
}
private String getNewChunkToWrite() {
return "chunk-" + chunkCount.getAndIncrement();
}
private String getBlockTraceID(BlockID blockID) {
return blockID.getContainerID() + "-" + blockID.getLocalID() +"-trace";
}
}