blob: 63735f77b7abd46cfbc5d1ef8d0819c807b8093d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.lock.LockManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hadoop.hdds.scm.TestUtils.createDatanodeDetails;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.TestUtils.randomDatanodeDetails;
/**
* Test cases to verify the functionality of ReplicationManager.
*/
public class TestReplicationManager {
private ReplicationManager replicationManager;
private ContainerStateManager containerStateManager;
private PlacementPolicy containerPlacementPolicy;
private EventQueue eventQueue;
private DatanodeCommandHandler datanodeCommandHandler;
@Before
public void setup() throws IOException, InterruptedException {
final Configuration conf = new OzoneConfiguration();
final ContainerManager containerManager =
Mockito.mock(ContainerManager.class);
eventQueue = new EventQueue();
containerStateManager = new ContainerStateManager(conf);
datanodeCommandHandler = new DatanodeCommandHandler();
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
Mockito.when(containerManager.getContainerIDs())
.thenAnswer(invocation -> containerStateManager.getAllContainerIDs());
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID)invocation.getArguments()[0]));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID)invocation.getArguments()[0]));
containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
Mockito.when(containerPlacementPolicy.chooseDatanodes(
Mockito.anyListOf(DatanodeDetails.class),
Mockito.anyListOf(DatanodeDetails.class),
Mockito.anyInt(), Mockito.anyLong()))
.thenAnswer(invocation -> {
int count = (int) invocation.getArguments()[2];
return IntStream.range(0, count)
.mapToObj(i -> randomDatanodeDetails())
.collect(Collectors.toList());
});
replicationManager = new ReplicationManager(
new ReplicationManagerConfiguration(),
containerManager,
containerPlacementPolicy,
eventQueue,
new LockManager<>(conf));
replicationManager.start();
Thread.sleep(100L);
}
/**
* Checks if restarting of replication manager works.
*/
@Test
public void testReplicationManagerRestart() throws InterruptedException {
Assert.assertTrue(replicationManager.isRunning());
replicationManager.stop();
// Stop is a non-blocking call, it might take sometime for the
// ReplicationManager to shutdown
Thread.sleep(500);
Assert.assertFalse(replicationManager.isRunning());
replicationManager.start();
Assert.assertTrue(replicationManager.isRunning());
}
/**
* Open containers are not handled by ReplicationManager.
* This test-case makes sure that ReplicationManages doesn't take
* any action on OPEN containers.
*/
@Test
public void testOpenContainer() throws SCMException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
containerStateManager.loadContainer(container);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
/**
* If the container is in CLOSING state we resend close container command
* to all the datanodes.
*/
@Test
public void testClosingContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final ContainerID id = container.containerID();
containerStateManager.loadContainer(container);
// Two replicas in CLOSING state
final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
randomDatanodeDetails(),
randomDatanodeDetails());
// One replica in OPEN state
final DatanodeDetails datanode = randomDatanodeDetails();
replicas.addAll(getReplicas(id, State.OPEN, datanode));
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
// Update the OPEN to CLOSING
for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
}
/**
* The container is QUASI_CLOSED but two of the replica is still in
* open state. ReplicationManager should resend close command to those
* datanodes.
*/
@Test
public void testQuasiClosedContainerWithTwoOpenReplica() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
final ContainerReplica replicaThree = getReplicas(
id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
// Two of the replicas are in OPEN state
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.closeContainerCommand,
replicaTwo.getDatanodeDetails()));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.closeContainerCommand,
replicaThree.getDatanodeDetails()));
}
/**
* When the container is in QUASI_CLOSED state and all the replicas are
* also in QUASI_CLOSED state and doesn't have a quorum to force close
* the container, ReplicationManager will not do anything.
*/
@Test
public void testHealthyQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
/**
* When a container is QUASI_CLOSED and we don't have quorum to force close
* the container, the container should have all the replicas in QUASI_CLOSED
* state, else ReplicationManager will take action.
*
* In this test case we make one of the replica unhealthy, replication manager
* will send delete container command to the datanode which has the unhealthy
* replica.
*/
@Test
public void testQuasiClosedContainerWithUnhealthyReplica()
throws SCMException, ContainerNotFoundException, InterruptedException,
ContainerReplicaNotFoundException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
// Make the first replica unhealthy
final ContainerReplica unhealthyReplica = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId,
replicaOne.getDatanodeDetails());
containerStateManager.updateContainerReplica(id, unhealthyReplica);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
// Now we will delete the unhealthy replica from in-memory.
containerStateManager.removeContainerReplica(id, replicaOne);
// The container is under replicated as unhealthy replica is removed
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
// We should get replicate command
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
}
/**
* When a QUASI_CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas.
*/
@Test
public void testOverReplicatedQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
}
/**
* When a QUASI_CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas. While choosing the replica for deletion
* ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
* replica.
*/
@Test
public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
throws SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
}
/**
* ReplicationManager should replicate an QUASI_CLOSED replica if it is
* under replicated.
*/
@Test
public void testUnderReplicatedQuasiClosedContainer() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
}
/**
* When a QUASI_CLOSED container is under replicated, ReplicationManager
* should re-replicate it. If there are any unhealthy replica, it has to
* be deleted.
*
* In this test case, the container is QUASI_CLOSED and is under replicated
* and also has an unhealthy replica.
*
* In the first iteration of ReplicationManager, it should re-replicate
* the container so that it has enough replicas.
*
* In the second iteration, ReplicationManager should delete the unhealthy
* replica.
*
* In the third iteration, ReplicationManager will re-replicate as the
* container has again become under replicated after the unhealthy
* replica has been deleted.
*
*/
@Test
public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
throws SCMException, ContainerNotFoundException, InterruptedException,
ContainerReplicaNotFoundException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.loadContainer(container);
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
.getReceivedCommands().stream()
.filter(c -> c.getCommand().getType()
.equals(SCMCommandProto.Type.replicateContainerCommand))
.findFirst();
Assert.assertTrue(replicateCommand.isPresent());
DatanodeDetails newNode = createDatanodeDetails(
replicateCommand.get().getDatanodeId());
ContainerReplica newReplica = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
containerStateManager.updateContainerReplica(id, newReplica);
/*
* We have report the replica to SCM, in the next ReplicationManager
* iteration it should delete the unhealthy replica.
*/
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// ReplicaTwo should be deleted, that is the unhealthy one
Assert.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaTwo.getDatanodeDetails()));
containerStateManager.removeContainerReplica(id, replicaTwo);
/*
* We have now removed unhealthy replica, next iteration of
* ReplicationManager should re-replicate the container as it
* is under replicated now
*/
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(currentReplicateCommandCount + 2,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
}
/**
* When a container is QUASI_CLOSED and it has >50% of its replica
* in QUASI_CLOSED state with unique origin node id,
* ReplicationManager should force close the replica(s) with
* highest BCSID.
*/
@Test
public void testQuasiClosedToClosed() throws
SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.loadContainer(container);
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
// All the replicas have same BCSID, so all of them will be closed.
Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
}
/**
* ReplicationManager should not take any action if the container is
* CLOSED and healthy.
*/
@Test
public void testHealthyClosedContainer()
throws SCMException, ContainerNotFoundException, InterruptedException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.loadContainer(container);
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processContainersNow();
// Wait for EventQueue to call the event handler
Thread.sleep(100L);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
}
@Test
public void testGeneratedConfig() {
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ReplicationManagerConfiguration rmc =
ozoneConfiguration.getObject(ReplicationManagerConfiguration.class);
//default is not included in ozone-site.xml but generated from annotation
//to the ozone-site-generated.xml which should be loaded by the
// OzoneConfiguration.
Assert.assertEquals(600000, rmc.getEventTimeout());
}
@After
public void teardown() throws IOException {
containerStateManager.close();
replicationManager.stop();
}
private class DatanodeCommandHandler implements
EventHandler<CommandForDatanode> {
private AtomicInteger invocation = new AtomicInteger(0);
private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
new HashMap<>();
private List<CommandForDatanode> commands = new ArrayList<>();
@Override
public void onMessage(final CommandForDatanode command,
final EventPublisher publisher) {
final SCMCommandProto.Type type = command.getCommand().getType();
commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
commandInvocation.get(type).incrementAndGet();
invocation.incrementAndGet();
commands.add(command);
}
private int getInvocation() {
return invocation.get();
}
private int getInvocationCount(SCMCommandProto.Type type) {
return commandInvocation.containsKey(type) ?
commandInvocation.get(type).get() : 0;
}
private List<CommandForDatanode> getReceivedCommands() {
return commands;
}
/**
* Returns true if the command handler has received the given
* command type for the provided datanode.
*
* @param type Command Type
* @param datanode DatanodeDetails
* @return True if command was received, false otherwise
*/
private boolean received(final SCMCommandProto.Type type,
final DatanodeDetails datanode) {
return commands.stream().anyMatch(dc ->
dc.getCommand().getType().equals(type) &&
dc.getDatanodeId().equals(datanode.getUuid()));
}
}
}