blob: 7f32be5c04c5b35806b508f0687a2e4424dbc7ba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo
.Builder;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.anyLong;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the behaviour of the ContainerReportHandler.
*/
public class TestContainerReportHandler implements EventPublisher {
private List<Object> publishedEvents = new ArrayList<>();
private final NodeManager nodeManager = new MockNodeManager(true, 1);
private static final Logger LOG =
LoggerFactory.getLogger(TestContainerReportHandler.class);
@Before
public void resetEventCollector() {
publishedEvents.clear();
}
@Test
public void test() throws IOException {
//GIVEN
OzoneConfiguration conf = new OzoneConfiguration();
ContainerManager containerManager = Mockito.mock(ContainerManager.class);
PipelineSelector selector = Mockito.mock(PipelineSelector.class);
when(containerManager.getContainer(anyLong()))
.thenAnswer(
(Answer<ContainerInfo>) invocation ->
new Builder()
.setReplicationFactor(ReplicationFactor.THREE)
.setContainerID((Long) invocation.getArguments()[0])
.setState(LifeCycleState.CLOSED)
.build()
);
ContainerStateManager containerStateManager =
new ContainerStateManager(conf, containerManager, selector);
when(containerManager.getStateManager()).thenReturn(containerStateManager);
ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus();
ContainerReportHandler reportHandler =
new ContainerReportHandler(containerManager, nodeManager,
replicationActivityStatus);
DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
nodeManager.addDatanodeInContainerMap(dn1.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn2.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn3.getUuid(), new HashSet<>());
nodeManager.addDatanodeInContainerMap(dn4.getUuid(), new HashSet<>());
PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class);
Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED,
ReplicationType.STAND_ALONE, ReplicationFactor.THREE,
PipelineID.randomId());
when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE,
ReplicationFactor.THREE)).thenReturn(pipeline);
ContainerInfo cont1 = containerStateManager
.allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
ReplicationFactor.THREE, "root").getContainerInfo();
ContainerInfo cont2 = containerStateManager
.allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
ReplicationFactor.THREE, "root").getContainerInfo();
// Open Container
ContainerInfo cont3 = containerStateManager
.allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE,
ReplicationFactor.THREE, "root").getContainerInfo();
long c1 = cont1.getContainerID();
long c2 = cont2.getContainerID();
long c3 = cont3.getContainerID();
// Close remaining containers
TestUtils.closeContainer(containerStateManager, cont1);
TestUtils.closeContainer(containerStateManager, cont2);
//when
//initial reports before replication is enabled. 2 containers w 3 replicas.
reportHandler.onMessage(
new ContainerReportFromDatanode(dn1,
createContainerReport(new long[] {c1, c2, c3})), this);
reportHandler.onMessage(
new ContainerReportFromDatanode(dn2,
createContainerReport(new long[] {c1, c2, c3})), this);
reportHandler.onMessage(
new ContainerReportFromDatanode(dn3,
createContainerReport(new long[] {c1, c2})), this);
reportHandler.onMessage(
new ContainerReportFromDatanode(dn4,
createContainerReport(new long[] {})), this);
Assert.assertEquals(0, publishedEvents.size());
replicationActivityStatus.enableReplication();
//no problem here
reportHandler.onMessage(
new ContainerReportFromDatanode(dn1,
createContainerReport(new long[] {c1, c2})), this);
Assert.assertEquals(0, publishedEvents.size());
//container is missing from d2
reportHandler.onMessage(
new ContainerReportFromDatanode(dn2,
createContainerReport(new long[] {c1})), this);
Assert.assertEquals(1, publishedEvents.size());
ReplicationRequest replicationRequest =
(ReplicationRequest) publishedEvents.get(0);
Assert.assertEquals(c2, replicationRequest.getContainerId());
Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
Assert.assertEquals(2, replicationRequest.getReplicationCount());
//container was replicated to dn4
reportHandler.onMessage(
new ContainerReportFromDatanode(dn4,
createContainerReport(new long[] {c2})), this);
//no more event, everything is perfect
Assert.assertEquals(1, publishedEvents.size());
//c2 was found at dn2 (it was missing before, magic)
reportHandler.onMessage(
new ContainerReportFromDatanode(dn2,
createContainerReport(new long[] {c1, c2})), this);
//c2 is over replicated (dn1,dn2,dn3,dn4)
Assert.assertEquals(2, publishedEvents.size());
replicationRequest =
(ReplicationRequest) publishedEvents.get(1);
Assert.assertEquals(c2, replicationRequest.getContainerId());
Assert.assertEquals(3, replicationRequest.getExpecReplicationCount());
Assert.assertEquals(4, replicationRequest.getReplicationCount());
}
private ContainerReportsProto createContainerReport(long[] containerIds) {
ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
for (long containerId : containerIds) {
org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder
ciBuilder = org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
.setSize(5368709120L)
.setUsed(2000000000L)
.setKeyCount(100000000L)
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setContainerID(containerId)
.setDeleteTransactionId(0);
crBuilder.addReports(ciBuilder.build());
}
return crBuilder.build();
}
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
LOG.info("Event is published: {}", payload);
publishedEvents.add(payload);
}
}