blob: 3967c0ccf722c05bf945d0fb5ddce761fabc8b8a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.*;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.junit.Assert.assertTrue;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.function.CheckedBiConsumer;
import java.util.Map;
import java.util.function.BiConsumer;
import org.junit.Test;
import org.junit.Assert;
/**
* This class tests the metrics of ContainerStateMachine.
*/
public class TestCSMMetrics {
static final String TEST_DIR =
GenericTestUtils.getTestDir("dfs").getAbsolutePath()
+ File.separator;
@FunctionalInterface
interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
OUT apply(LEFT left, RIGHT right) throws THROWABLE;
}
@Test
public void testContainerStateMachineMetrics() throws Exception {
runContainerStateMachineMetrics(1,
(pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf),
XceiverClientRatis::newXceiverClientRatis,
TestCSMMetrics::newXceiverServerRatis,
(dn, p) -> RatisTestHelper.initXceiverServerRatis(GRPC, dn, p));
}
static void runContainerStateMachineMetrics(
int numDatanodes,
BiConsumer<Pipeline, OzoneConfiguration> initConf,
TestCSMMetrics.CheckedBiFunction<Pipeline, OzoneConfiguration,
XceiverClientSpi, IOException> createClient,
TestCSMMetrics.CheckedBiFunction<DatanodeDetails, OzoneConfiguration,
XceiverServerSpi, IOException> createServer,
CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
throws Exception {
final List<XceiverServerSpi> servers = new ArrayList<>();
XceiverClientSpi client = null;
String containerName = OzoneUtils.getRequestID();
try {
final Pipeline pipeline = ContainerTestHelper.createPipeline(
numDatanodes);
final OzoneConfiguration conf = new OzoneConfiguration();
initConf.accept(pipeline, conf);
for (DatanodeDetails dn : pipeline.getNodes()) {
final XceiverServerSpi s = createServer.apply(dn, conf);
servers.add(s);
s.start();
initServer.accept(dn, pipeline);
}
client = createClient.apply(pipeline, conf);
client.connect();
// Before Read Chunk/Write Chunk
MetricsRecordBuilder metric = getMetrics(CSMMetrics.SOURCE_NAME +
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
assertCounter("NumWriteStateMachineOps", 0L, metric);
assertCounter("NumReadStateMachineOps", 0L, metric);
assertCounter("NumApplyTransactionOps", 0L, metric);
assertCounter("NumBytesWrittenCount", 0L, metric);
assertCounter("NumBytesCommittedCount", 0L, metric);
assertCounter("NumStartTransactionVerifyFailures", 0L, metric);
assertCounter("NumContainerNotOpenVerifyFailures", 0L, metric);
assertCounter("WriteChunkNumOps", 0L, metric);
double applyTransactionLatency = getDoubleGauge(
"ApplyTransactionAvgTime", metric);
assertTrue(applyTransactionLatency == 0.0);
double writeStateMachineLatency = getDoubleGauge(
"WriteStateMachineDataAvgTime", metric);
assertTrue(writeStateMachineLatency == 0.0);
// Write Chunk
BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
getTestContainerID());
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(
pipeline, blockID, 1024);
ContainerCommandResponseProto response =
client.sendCommand(writeChunkRequest);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
metric = getMetrics(CSMMetrics.SOURCE_NAME +
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
assertCounter("NumWriteStateMachineOps", 1L, metric);
assertCounter("NumBytesWrittenCount", 1024L, metric);
assertCounter("NumApplyTransactionOps", 1L, metric);
assertCounter("NumBytesCommittedCount", 1024L, metric);
assertCounter("NumStartTransactionVerifyFailures", 0L, metric);
assertCounter("NumContainerNotOpenVerifyFailures", 0L, metric);
assertCounter("WriteChunkNumOps", 1L, metric);
//Read Chunk
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
.getWriteChunk());
response = client.sendCommand(readChunkRequest);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
metric = getMetrics(CSMMetrics.SOURCE_NAME +
RaftGroupId.valueOf(pipeline.getId().getId()).toString());
assertCounter("NumQueryStateMachineOps", 1L, metric);
assertCounter("NumApplyTransactionOps", 1L, metric);
applyTransactionLatency = getDoubleGauge(
"ApplyTransactionAvgTime", metric);
assertTrue(applyTransactionLatency > 0.0);
writeStateMachineLatency = getDoubleGauge(
"WriteStateMachineDataAvgTime", metric);
assertTrue(writeStateMachineLatency > 0.0);
} finally {
if (client != null) {
client.close();
}
servers.stream().forEach(XceiverServerSpi::stop);
}
}
static XceiverServerRatis newXceiverServerRatis(
DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
final String dir = TEST_DIR + dn.getUuid();
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
new ContainerController(new ContainerSet(), Maps.newHashMap()),
null, null);
}
private static class TestContainerDispatcher implements ContainerDispatcher {
/**
* Dispatches commands to container layer.
*
* @param msg - Command Request
* @return Command Response
*/
@Override
public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg,
DispatcherContext context) {
return ContainerTestHelper.getCreateContainerResponse(msg);
}
@Override
public void validateContainerCommand(
ContainerCommandRequestProto msg) throws StorageContainerException {
}
@Override
public void init() {
}
@Override
public void shutdown() {
}
@Override
public Handler getHandler(ContainerProtos.ContainerType containerType) {
return null;
}
@Override
public void setScmId(String scmId) {
}
@Override
public void buildMissingContainerSetAndValidate(
Map<Long, Long> container2BCSIDMap) {
}
}
}