blob: 4687aa08a287f0fb4919fa763921c3aed3a06138 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.scm;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultLayoutVersionProto;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.apache.ozone.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/**
* Tests for Recon Node Manager.
*/
public class TestReconNodeManager {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private OzoneConfiguration conf;
private DBStore store;
private ReconStorageConfig reconStorageConfig;
private HDDSLayoutVersionManager versionManager;
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
conf.set(OZONE_METADATA_DIRS,
temporaryFolder.newFolder().getAbsolutePath());
conf.set(OZONE_SCM_NAMES, "localhost");
reconStorageConfig = new ReconStorageConfig(conf);
versionManager = new HDDSLayoutVersionManager(
reconStorageConfig.getLayoutVersion());
store = DBStoreBuilder.createDBStore(conf, new ReconSCMDBDefinition());
}
@After
public void tearDown() throws Exception {
store.close();
}
@Test
public void testReconNodeDB() throws IOException, NodeNotFoundException {
ReconStorageConfig scmStorageConfig = new ReconStorageConfig(conf);
EventQueue eventQueue = new EventQueue();
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
Table<UUID, DatanodeDetails> nodeTable =
ReconSCMDBDefinition.NODES.getTable(store);
ReconNodeManager reconNodeManager = new ReconNodeManager(conf,
scmStorageConfig, eventQueue, clusterMap, nodeTable, versionManager);
ReconNewNodeHandler reconNewNodeHandler =
new ReconNewNodeHandler(reconNodeManager);
assertTrue(reconNodeManager.getAllNodes().isEmpty());
DatanodeDetails datanodeDetails = randomDatanodeDetails();
String uuidString = datanodeDetails.getUuidString();
// Register a random datanode.
reconNodeManager.register(datanodeDetails, null, null);
reconNewNodeHandler.onMessage(reconNodeManager.getNodeByUuid(uuidString),
null);
assertEquals(1, reconNodeManager.getAllNodes().size());
assertNotNull(reconNodeManager.getNodeByUuid(uuidString));
// If any commands are added to the eventQueue without using the onMessage
// interface, then they should be filtered out and not returned to the DN
// when it heartbeats.
// This command should never be returned by Recon
reconNodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new SetNodeOperationalStateCommand(1234,
DECOMMISSIONING, 0));
// This one should be returned
reconNodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
new ReregisterCommand());
// OperationalState sanity check
final DatanodeDetails dnDetails =
reconNodeManager.getNodeByUuid(datanodeDetails.getUuidString());
assertEquals(HddsProtos.NodeOperationalState.IN_SERVICE,
dnDetails.getPersistedOpState());
assertEquals(dnDetails.getPersistedOpState(),
reconNodeManager.getNodeStatus(dnDetails)
.getOperationalState());
assertEquals(dnDetails.getPersistedOpStateExpiryEpochSec(),
reconNodeManager.getNodeStatus(dnDetails)
.getOpStateExpiryEpochSeconds());
// Upon processing the heartbeat, the illegal command should be filtered out
List<SCMCommand> returnedCmds =
reconNodeManager.processHeartbeat(datanodeDetails,
defaultLayoutVersionProto());
assertEquals(1, returnedCmds.size());
assertEquals(SCMCommandProto.Type.reregisterCommand,
returnedCmds.get(0).getType());
// Now feed a DECOMMISSIONED heartbeat of the same DN
datanodeDetails.setPersistedOpState(
HddsProtos.NodeOperationalState.DECOMMISSIONED);
datanodeDetails.setPersistedOpStateExpiryEpochSec(12345L);
reconNodeManager.processHeartbeat(datanodeDetails,
defaultLayoutVersionProto());
// Check both persistedOpState and NodeStatus#operationalState
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED,
dnDetails.getPersistedOpState());
assertEquals(dnDetails.getPersistedOpState(),
reconNodeManager.getNodeStatus(dnDetails)
.getOperationalState());
assertEquals(12345L, dnDetails.getPersistedOpStateExpiryEpochSec());
assertEquals(dnDetails.getPersistedOpStateExpiryEpochSec(),
reconNodeManager.getNodeStatus(dnDetails)
.getOpStateExpiryEpochSeconds());
// Close the DB, and recreate the instance of Recon Node Manager.
eventQueue.close();
reconNodeManager.close();
reconNodeManager = new ReconNodeManager(conf, scmStorageConfig, eventQueue,
clusterMap, nodeTable, versionManager);
// Verify that the node information was persisted and loaded back.
assertEquals(1, reconNodeManager.getAllNodes().size());
assertNotNull(
reconNodeManager.getNodeByUuid(datanodeDetails.getUuidString()));
}
@Test
public void testUpdateNodeOperationalStateFromScm() throws Exception {
ReconStorageConfig scmStorageConfig = new ReconStorageConfig(conf);
EventQueue eventQueue = new EventQueue();
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
Table<UUID, DatanodeDetails> nodeTable =
ReconSCMDBDefinition.NODES.getTable(store);
ReconNodeManager reconNodeManager = new ReconNodeManager(conf,
scmStorageConfig, eventQueue, clusterMap, nodeTable, versionManager);
DatanodeDetails datanodeDetails = randomDatanodeDetails();
HddsProtos.Node node = mock(HddsProtos.Node.class);
LambdaTestUtils.intercept(NodeNotFoundException.class, () -> {
reconNodeManager.updateNodeOperationalStateFromScm(node, datanodeDetails);
});
reconNodeManager.register(datanodeDetails, null, null);
assertEquals(IN_SERVICE, reconNodeManager
.getNodeByUuid(datanodeDetails.getUuidString()).getPersistedOpState());
when(node.getNodeOperationalStates(eq(0)))
.thenReturn(DECOMMISSIONING);
reconNodeManager.updateNodeOperationalStateFromScm(node, datanodeDetails);
assertEquals(DECOMMISSIONING, reconNodeManager
.getNodeByUuid(datanodeDetails.getUuidString()).getPersistedOpState());
List<DatanodeDetails> nodes =
reconNodeManager.getNodes(DECOMMISSIONING, null);
assertEquals(1, nodes.size());
assertEquals(datanodeDetails.getUuid(), nodes.get(0).getUuid());
}
@Test
public void testDatanodeUpdate() throws IOException {
ReconStorageConfig scmStorageConfig = new ReconStorageConfig(conf);
EventQueue eventQueue = new EventQueue();
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
Table<UUID, DatanodeDetails> nodeTable =
ReconSCMDBDefinition.NODES.getTable(store);
ReconNodeManager reconNodeManager = new ReconNodeManager(conf,
scmStorageConfig, eventQueue, clusterMap, nodeTable, versionManager);
ReconNewNodeHandler reconNewNodeHandler =
new ReconNewNodeHandler(reconNodeManager);
assertTrue(reconNodeManager.getAllNodes().isEmpty());
DatanodeDetails datanodeDetails = randomDatanodeDetails();
datanodeDetails.setHostName("hostname1");
String uuidString = datanodeDetails.getUuidString();
// Register "hostname1" datanode.
reconNodeManager.register(datanodeDetails, null, null);
reconNewNodeHandler.onMessage(reconNodeManager.getNodeByUuid(uuidString),
null);
assertEquals(1, reconNodeManager.getAllNodes().size());
assertNotNull(reconNodeManager.getNodeByUuid(uuidString));
assertEquals("hostname1",
reconNodeManager.getNodeByUuid(uuidString).getHostName());
datanodeDetails.setHostName("hostname2");
// Upon processing the heartbeat, the illegal command should be filtered out
List<SCMCommand> returnedCmds =
reconNodeManager.processHeartbeat(datanodeDetails,
defaultLayoutVersionProto());
assertEquals(1, returnedCmds.size());
assertEquals(SCMCommandProto.Type.reregisterCommand,
returnedCmds.get(0).getType());
}
}