blob: 92341314555f16ada15d732ec62459c26d9afd14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.api;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.recon.ReconTestInjector;
import org.apache.hadoop.ozone.recon.api.types.ClusterStateResponse;
import org.apache.hadoop.ozone.recon.api.types.DatanodeMetadata;
import org.apache.hadoop.ozone.recon.api.types.DatanodesResponse;
import org.apache.hadoop.ozone.recon.api.types.PipelineMetadata;
import org.apache.hadoop.ozone.recon.api.types.PipelinesResponse;
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;
/**
* Test for Recon API endpoints.
*/
public class TestEndpoints extends AbstractReconSqlDBTest {
private NodeEndpoint nodeEndpoint;
private PipelineEndpoint pipelineEndpoint;
private ClusterStateEndpoint clusterStateEndpoint;
private ReconOMMetadataManager reconOMMetadataManager;
private ReconStorageContainerManagerFacade reconScm;
private boolean isSetupDone = false;
private String pipelineId;
private DatanodeDetails datanodeDetails;
private DatanodeDetails datanodeDetails2;
private long containerId = 1L;
private ContainerReportsProto containerReportsProto;
private DatanodeDetailsProto datanodeDetailsProto;
private Pipeline pipeline;
private final String host1 = "host1.datanode";
private final String host2 = "host2.datanode";
private final String ip1 = "1.1.1.1";
private final String ip2 = "2.2.2.2";
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private void initializeInjector() throws IOException {
reconOMMetadataManager = getTestReconOmMetadataManager(
initializeNewOmMetadataManager(temporaryFolder.newFolder()),
temporaryFolder.newFolder());
datanodeDetails = randomDatanodeDetails();
datanodeDetails2 = randomDatanodeDetails();
datanodeDetails.setHostName(host1);
datanodeDetails.setIpAddress(ip1);
datanodeDetails2.setHostName(host2);
datanodeDetails2.setIpAddress(ip2);
pipeline = getRandomPipeline(datanodeDetails);
pipelineId = pipeline.getId().getId().toString();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerID(containerId)
.setReplicationFactor(ReplicationFactor.ONE)
.setState(LifeCycleState.OPEN)
.setOwner("test")
.setPipelineID(pipeline.getId())
.setReplicationType(ReplicationType.RATIS)
.build();
ContainerWithPipeline containerWithPipeline =
new ContainerWithPipeline(containerInfo, pipeline);
StorageContainerLocationProtocol mockScmClient = mock(
StorageContainerLocationProtocol.class);
StorageContainerServiceProvider mockScmServiceProvider = mock(
StorageContainerServiceProviderImpl.class);
when(mockScmServiceProvider.getPipeline(
pipeline.getId().getProtobuf())).thenReturn(pipeline);
when(mockScmServiceProvider.getContainerWithPipeline(containerId))
.thenReturn(containerWithPipeline);
ReconTestInjector reconTestInjector =
new ReconTestInjector.Builder(temporaryFolder)
.withReconSqlDb()
.withReconOm(reconOMMetadataManager)
.withOmServiceProvider(mock(OzoneManagerServiceProviderImpl.class))
.addBinding(StorageContainerServiceProvider.class,
mockScmServiceProvider)
.addBinding(OzoneStorageContainerManager.class,
ReconStorageContainerManagerFacade.class)
.addBinding(ClusterStateEndpoint.class)
.addBinding(NodeEndpoint.class)
.addBinding(ContainerSchemaManager.class)
.addBinding(StorageContainerLocationProtocol.class, mockScmClient)
.build();
nodeEndpoint = reconTestInjector.getInstance(NodeEndpoint.class);
pipelineEndpoint = reconTestInjector.getInstance(PipelineEndpoint.class);
clusterStateEndpoint =
reconTestInjector.getInstance(ClusterStateEndpoint.class);
reconScm = (ReconStorageContainerManagerFacade)
reconTestInjector.getInstance(OzoneStorageContainerManager.class);
}
@Before
public void setUp() throws Exception {
// The following setup runs only once
if (!isSetupDone) {
initializeInjector();
isSetupDone = true;
}
String datanodeId = datanodeDetails.getUuid().toString();
String datanodeId2 = datanodeDetails2.getUuid().toString();
containerReportsProto =
ContainerReportsProto.newBuilder()
.addReports(
ContainerReplicaProto.newBuilder()
.setContainerID(containerId)
.setState(ContainerReplicaProto.State.OPEN)
.setOriginNodeId(datanodeId)
.build())
.build();
PipelineReport pipelineReport = PipelineReport.newBuilder()
.setPipelineID(
PipelineID.newBuilder().setId(pipelineId).build())
.setIsLeader(true)
.build();
PipelineReportsProto pipelineReportsProto =
PipelineReportsProto.newBuilder()
.addPipelineReport(pipelineReport).build();
datanodeDetailsProto =
DatanodeDetailsProto.newBuilder()
.setHostName(host1)
.setUuid(datanodeId)
.setIpAddress(ip1)
.build();
StorageReportProto storageReportProto1 =
StorageReportProto.newBuilder().setStorageType(StorageTypeProto.DISK)
.setStorageLocation("/disk1").setScmUsed(10000).setRemaining(5400)
.setCapacity(25000)
.setStorageUuid(UUID.randomUUID().toString())
.setFailed(false).build();
StorageReportProto storageReportProto2 =
StorageReportProto.newBuilder().setStorageType(StorageTypeProto.DISK)
.setStorageLocation("/disk2").setScmUsed(25000).setRemaining(10000)
.setCapacity(50000)
.setStorageUuid(UUID.randomUUID().toString())
.setFailed(false).build();
NodeReportProto nodeReportProto =
NodeReportProto.newBuilder()
.addStorageReport(storageReportProto1)
.addStorageReport(storageReportProto2).build();
DatanodeDetailsProto datanodeDetailsProto2 =
DatanodeDetailsProto.newBuilder()
.setHostName(host2)
.setUuid(datanodeId2)
.setIpAddress(ip2)
.build();
StorageReportProto storageReportProto3 =
StorageReportProto.newBuilder().setStorageType(StorageTypeProto.DISK)
.setStorageLocation("/disk1").setScmUsed(20000).setRemaining(7800)
.setCapacity(50000)
.setStorageUuid(UUID.randomUUID().toString())
.setFailed(false).build();
StorageReportProto storageReportProto4 =
StorageReportProto.newBuilder().setStorageType(StorageTypeProto.DISK)
.setStorageLocation("/disk2").setScmUsed(60000).setRemaining(10000)
.setCapacity(80000)
.setStorageUuid(UUID.randomUUID().toString())
.setFailed(false).build();
NodeReportProto nodeReportProto2 =
NodeReportProto.newBuilder()
.addStorageReport(storageReportProto3)
.addStorageReport(storageReportProto4).build();
try {
reconScm.getDatanodeProtocolServer()
.register(datanodeDetailsProto, nodeReportProto,
containerReportsProto, pipelineReportsProto);
reconScm.getDatanodeProtocolServer()
.register(datanodeDetailsProto2, nodeReportProto2,
ContainerReportsProto.newBuilder().build(),
PipelineReportsProto.newBuilder().build());
// Process all events in the event queue
reconScm.getEventQueue().processAll(1000);
} catch (Exception ex) {
Assert.fail(ex.getMessage());
}
// Write Data to OM
// A sample volume (sampleVol) and a bucket (bucketOne) is already created
// in AbstractOMMetadataManagerTest.
// Create a new volume and bucket and then write keys to the bucket.
String volumeKey = reconOMMetadataManager.getVolumeKey("sampleVol2");
OmVolumeArgs args =
OmVolumeArgs.newBuilder()
.setVolume("sampleVol2")
.setAdminName("TestUser")
.setOwnerName("TestUser")
.build();
reconOMMetadataManager.getVolumeTable().put(volumeKey, args);
OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
.setVolumeName("sampleVol2")
.setBucketName("bucketOne")
.build();
String bucketKey = reconOMMetadataManager.getBucketKey(
bucketInfo.getVolumeName(), bucketInfo.getBucketName());
reconOMMetadataManager.getBucketTable().put(bucketKey, bucketInfo);
// key = key_one
writeDataToOm(reconOMMetadataManager, "key_one");
// key = key_two
writeDataToOm(reconOMMetadataManager, "key_two");
// key = key_three
writeDataToOm(reconOMMetadataManager, "key_three");
}
private void testDatanodeResponse(DatanodeMetadata datanodeMetadata)
throws IOException {
String hostname = datanodeMetadata.getHostname();
switch (hostname) {
case host1:
Assert.assertEquals(75000,
datanodeMetadata.getDatanodeStorageReport().getCapacity());
Assert.assertEquals(15400,
datanodeMetadata.getDatanodeStorageReport().getRemaining());
Assert.assertEquals(35000,
datanodeMetadata.getDatanodeStorageReport().getUsed());
Assert.assertEquals(1, datanodeMetadata.getPipelines().size());
Assert.assertEquals(pipelineId,
datanodeMetadata.getPipelines().get(0).getPipelineID().toString());
Assert.assertEquals(pipeline.getFactor().getNumber(),
datanodeMetadata.getPipelines().get(0).getReplicationFactor());
Assert.assertEquals(pipeline.getType().toString(),
datanodeMetadata.getPipelines().get(0).getReplicationType());
Assert.assertEquals(pipeline.getLeaderNode().getHostName(),
datanodeMetadata.getPipelines().get(0).getLeaderNode());
Assert.assertEquals(1, datanodeMetadata.getLeaderCount());
break;
case host2:
Assert.assertEquals(130000,
datanodeMetadata.getDatanodeStorageReport().getCapacity());
Assert.assertEquals(17800,
datanodeMetadata.getDatanodeStorageReport().getRemaining());
Assert.assertEquals(80000,
datanodeMetadata.getDatanodeStorageReport().getUsed());
Assert.assertEquals(0, datanodeMetadata.getPipelines().size());
Assert.assertEquals(0, datanodeMetadata.getLeaderCount());
break;
default:
Assert.fail(String.format("Datanode %s not registered",
hostname));
}
}
@Test
public void testGetDatanodes() throws Exception {
Response response = nodeEndpoint.getDatanodes();
DatanodesResponse datanodesResponse =
(DatanodesResponse) response.getEntity();
Assert.assertEquals(2, datanodesResponse.getTotalCount());
Assert.assertEquals(2, datanodesResponse.getDatanodes().size());
datanodesResponse.getDatanodes().forEach(datanodeMetadata -> {
try {
testDatanodeResponse(datanodeMetadata);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
});
waitAndCheckConditionAfterHeartbeat(() -> {
Response response1 = nodeEndpoint.getDatanodes();
DatanodesResponse datanodesResponse1 =
(DatanodesResponse) response1.getEntity();
DatanodeMetadata datanodeMetadata1 =
datanodesResponse1.getDatanodes().stream().filter(datanodeMetadata ->
datanodeMetadata.getHostname().equals("host1.datanode"))
.findFirst().orElse(null);
return (datanodeMetadata1 != null &&
datanodeMetadata1.getContainers() == 1);
});
Assert.assertEquals(1,
reconScm.getPipelineManager()
.getContainersInPipeline(pipeline.getId()).size());
}
@Test
public void testGetPipelines() throws Exception {
Response response = pipelineEndpoint.getPipelines();
PipelinesResponse pipelinesResponse =
(PipelinesResponse) response.getEntity();
Assert.assertEquals(1, pipelinesResponse.getTotalCount());
Assert.assertEquals(1, pipelinesResponse.getPipelines().size());
PipelineMetadata pipelineMetadata =
pipelinesResponse.getPipelines().iterator().next();
Assert.assertEquals(1, pipelineMetadata.getDatanodes().size());
Assert.assertEquals(pipeline.getType().toString(),
pipelineMetadata.getReplicationType());
Assert.assertEquals(pipeline.getFactor().getNumber(),
pipelineMetadata.getReplicationFactor());
Assert.assertEquals(datanodeDetails.getHostName(),
pipelineMetadata.getLeaderNode());
Assert.assertEquals(pipeline.getId().getId(),
pipelineMetadata.getPipelineId());
waitAndCheckConditionAfterHeartbeat(() -> {
Response response1 = pipelineEndpoint.getPipelines();
PipelinesResponse pipelinesResponse1 =
(PipelinesResponse) response1.getEntity();
PipelineMetadata pipelineMetadata1 =
pipelinesResponse1.getPipelines().iterator().next();
return (pipelineMetadata1.getContainers() == 1);
});
}
@Test
public void testGetClusterState() throws Exception {
Response response = clusterStateEndpoint.getClusterState();
ClusterStateResponse clusterStateResponse =
(ClusterStateResponse) response.getEntity();
Assert.assertEquals(1, clusterStateResponse.getPipelines());
Assert.assertEquals(2, clusterStateResponse.getVolumes());
Assert.assertEquals(2, clusterStateResponse.getBuckets());
Assert.assertEquals(3, clusterStateResponse.getKeys());
Assert.assertEquals(2, clusterStateResponse.getTotalDatanodes());
Assert.assertEquals(2, clusterStateResponse.getHealthyDatanodes());
waitAndCheckConditionAfterHeartbeat(() -> {
Response response1 = clusterStateEndpoint.getClusterState();
ClusterStateResponse clusterStateResponse1 =
(ClusterStateResponse) response1.getEntity();
return (clusterStateResponse1.getContainers() == 1);
});
}
private void waitAndCheckConditionAfterHeartbeat(Callable<Boolean> check)
throws Exception {
// if container report is processed first, and pipeline does not exist
// then container is not added until the next container report is processed
SCMHeartbeatRequestProto heartbeatRequestProto =
SCMHeartbeatRequestProto.newBuilder()
.setContainerReport(containerReportsProto)
.setDatanodeDetails(datanodeDetailsProto)
.build();
reconScm.getDatanodeProtocolServer().sendHeartbeat(heartbeatRequestProto);
LambdaTestUtils.await(30000, 1000, check);
}
}