blob: b1816774241cd388cc488bd9951e07ad155f4e5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.resourcemanager.NodeAttributeTestUtils;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.Assert;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Unit tests for node attribute manager.
*/
public class TestNodeAttributesManager {
private NodeAttributesManager attributesManager;
private final static String[] PREFIXES =
new String[] {"yarn.test1.io", "yarn.test2.io", "yarn.test3.io"};
private final static String[] HOSTNAMES =
new String[] {"host1", "host2", "host3"};
@Before
public void init() throws IOException {
Configuration conf = new Configuration();
attributesManager = new NodeAttributesManagerImpl();
conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
conf = NodeAttributeTestUtils.getRandomDirConf(conf);
attributesManager.init(conf);
attributesManager.start();
}
@After
public void cleanUp() {
if (attributesManager != null) {
attributesManager.stop();
}
}
private Set<NodeAttribute> createAttributesForTest(String attributePrefix,
int numOfAttributes, String attributeNamePrefix,
String attributeValuePrefix) {
Set<NodeAttribute> attributes = new HashSet<>();
for (int i = 0; i< numOfAttributes; i++) {
NodeAttribute attribute = NodeAttribute.newInstance(
attributePrefix, attributeNamePrefix + "_" + i,
NodeAttributeType.STRING, attributeValuePrefix + "_" + i);
attributes.add(attribute);
}
return attributes;
}
private boolean sameAttributeSet(Set<NodeAttribute> set1,
Set<NodeAttribute> set2) {
return Sets.difference(set1, set2).isEmpty();
}
@Test
public void testAddNodeAttributes() throws IOException {
Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
Map<NodeAttribute, AttributeValue> nodeAttributes;
// Add 3 attributes to host1
// yarn.test1.io/A1=host1_v1_1
// yarn.test1.io/A2=host1_v1_2
// yarn.test1.io/A3=host1_v1_3
toAddAttributes.put(HOSTNAMES[0],
createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"));
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(3, nodeAttributes.size());
Assert.assertTrue(sameAttributeSet(toAddAttributes.get(HOSTNAMES[0]),
nodeAttributes.keySet()));
// Add 2 attributes to host2
// yarn.test1.io/A1=host2_v1_1
// yarn.test1.io/A2=host2_v1_2
toAddAttributes.clear();
toAddAttributes.put(HOSTNAMES[1],
createAttributesForTest(PREFIXES[0], 2, "A", "host2_v1"));
attributesManager.addNodeAttributes(toAddAttributes);
// Verify host1 attributes are still valid.
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(3, nodeAttributes.size());
// Verify new added host2 attributes are correctly updated.
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]);
Assert.assertEquals(2, nodeAttributes.size());
Assert.assertTrue(sameAttributeSet(toAddAttributes.get(HOSTNAMES[1]),
nodeAttributes.keySet()));
// Cluster wide, it only has 3 attributes.
// yarn.test1.io/A1
// yarn.test1.io/A2
// yarn.test1.io/A3
Set<NodeAttribute> clusterAttributes = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
Assert.assertEquals(3, clusterAttributes.size());
// Query for attributes under a non-exist prefix,
// ensure it returns an empty set.
clusterAttributes = attributesManager
.getClusterNodeAttributes(Sets.newHashSet("non_exist_prefix"));
Assert.assertEquals(0, clusterAttributes.size());
// Not provide any prefix, ensure it returns all attributes.
clusterAttributes = attributesManager.getClusterNodeAttributes(null);
Assert.assertEquals(3, clusterAttributes.size());
// Add some other attributes with different prefixes on host1 and host2.
toAddAttributes.clear();
// Host1
// yarn.test2.io/A_1=host1_v2_1
// ...
// yarn.test2.io/A_10=host1_v2_10
toAddAttributes.put(HOSTNAMES[0],
createAttributesForTest(PREFIXES[1], 10, "C", "host1_v2"));
// Host2
// yarn.test2.io/C_1=host1_v2_1
// ...
// yarn.test2.io/C_20=host1_v2_20
toAddAttributes.put(HOSTNAMES[1],
createAttributesForTest(PREFIXES[1], 20, "C", "host1_v2"));
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(13, nodeAttributes.size());
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]);
Assert.assertEquals(22, nodeAttributes.size());
}
@Test
public void testRemoveNodeAttributes() throws IOException {
Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
Map<String, Set<NodeAttribute>> toRemoveAttributes = new HashMap<>();
Set<NodeAttribute> allAttributesPerPrefix = new HashSet<>();
Map<NodeAttribute, AttributeValue> nodeAttributes;
// Host1 -----------------------
// yarn.test1.io
// A1=host1_v1_1
// A2=host1_v1_2
// A3=host1_v1_3
// yarn.test2.io
// B1=host1_v2_1
// ...
// B5=host5_v2_5
// Host2 -----------------------
// yarn.test1.io
// A1=host2_v1_1
// A2=host2_v1_2
// yarn.test3.io
// C1=host2_v3_1
// c2=host2_v3_2
Set<NodeAttribute> host1set = new HashSet<>();
Set<NodeAttribute> host1set1 =
createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1");
Set<NodeAttribute> host1set2 =
createAttributesForTest(PREFIXES[1], 5, "B", "host1_v1");
host1set.addAll(host1set1);
host1set.addAll(host1set2);
Set<NodeAttribute> host2set = new HashSet<>();
Set<NodeAttribute> host2set1 =
createAttributesForTest(PREFIXES[0], 2, "A", "host2_v1");
Set<NodeAttribute> host2set2 =
createAttributesForTest(PREFIXES[2], 2, "C", "host2_v3");
host2set.addAll(host2set1);
host2set.addAll(host2set2);
toAddAttributes.put(HOSTNAMES[0], host1set);
toAddAttributes.put(HOSTNAMES[1], host2set);
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(8, nodeAttributes.size());
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[1]);
Assert.assertEquals(4, nodeAttributes.size());
allAttributesPerPrefix = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
Assert.assertEquals(3, allAttributesPerPrefix.size());
allAttributesPerPrefix = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1]));
Assert.assertEquals(5, allAttributesPerPrefix.size());
allAttributesPerPrefix = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[2]));
Assert.assertEquals(2, allAttributesPerPrefix.size());
// Remove "yarn.test1.io/A_2" from host1
Set<NodeAttribute> attributes2rm1 = new HashSet<>();
attributes2rm1.add(NodeAttribute.newInstance(PREFIXES[0], "A_2",
NodeAttributeType.STRING, "anyValue"));
toRemoveAttributes.put(HOSTNAMES[0], attributes2rm1);
attributesManager.removeNodeAttributes(toRemoveAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(7, nodeAttributes.size());
// Remove again, but give a non-exist attribute name
attributes2rm1.clear();
toRemoveAttributes.clear();
attributes2rm1.add(NodeAttribute.newInstance(PREFIXES[0], "non_exist_name",
NodeAttributeType.STRING, "anyValue"));
toRemoveAttributes.put(HOSTNAMES[0], attributes2rm1);
attributesManager.removeNodeAttributes(toRemoveAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(7, nodeAttributes.size());
// Remove "yarn.test1.io/A_2" from host2 too,
// by then there will be no such attribute exist in the cluster.
Set<NodeAttribute> attributes2rm2 = new HashSet<>();
attributes2rm2.add(NodeAttribute.newInstance(PREFIXES[0], "A_2",
NodeAttributeType.STRING, "anyValue"));
toRemoveAttributes.clear();
toRemoveAttributes.put(HOSTNAMES[1], attributes2rm2);
attributesManager.removeNodeAttributes(toRemoveAttributes);
// Make sure cluster wide attributes are still consistent.
// Since both host1 and host2 doesn't have "yarn.test1.io/A_2",
// get all attributes under prefix "yarn.test1.io" should only return
// us A_1 and A_3.
allAttributesPerPrefix = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[0]));
Assert.assertEquals(2, allAttributesPerPrefix.size());
}
@Test
public void testReplaceNodeAttributes() throws IOException {
Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
Map<String, Set<NodeAttribute>> toReplaceMap = new HashMap<>();
Map<NodeAttribute, AttributeValue> nodeAttributes;
Set<NodeAttribute> filteredAttributes;
Set<NodeAttribute> clusterAttributes;
// Add 3 attributes to host1
// yarn.test1.io/A1=host1_v1_1
// yarn.test1.io/A2=host1_v1_2
// yarn.test1.io/A3=host1_v1_3
toAddAttributes.put(HOSTNAMES[0],
createAttributesForTest(PREFIXES[0], 3, "A", "host1_v1"));
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(3, nodeAttributes.size());
// Add 10 distributed node attributes to host1
// nn.yarn.io/dist-node-attribute1=dist_v1_1
// nn.yarn.io/dist-node-attribute2=dist_v1_2
// ...
// nn.yarn.io/dist-node-attribute10=dist_v1_10
toAddAttributes.clear();
toAddAttributes.put(HOSTNAMES[0],
createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED,
10, "dist-node-attribute", "dist_v1"));
attributesManager.addNodeAttributes(toAddAttributes);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(13, nodeAttributes.size());
clusterAttributes = attributesManager.getClusterNodeAttributes(
Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
Assert.assertEquals(13, clusterAttributes.size());
// Replace by prefix
// Same distributed attributes names, but different values.
Set<NodeAttribute> toReplaceAttributes =
createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 5,
"dist-node-attribute", "dist_v2");
attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(8, nodeAttributes.size());
clusterAttributes = attributesManager.getClusterNodeAttributes(
Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED, PREFIXES[0]));
Assert.assertEquals(8, clusterAttributes.size());
// Now we have 5 distributed attributes
filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
nodeAttributes.keySet(), NodeAttribute.PREFIX_DISTRIBUTED);
Assert.assertEquals(5, filteredAttributes.size());
// Values are updated to have prefix dist_v2
Assert.assertTrue(filteredAttributes.stream().allMatch(
nodeAttribute ->
nodeAttribute.getAttributeValue().startsWith("dist_v2")));
// We still have 3 yarn.test1.io attributes
filteredAttributes = NodeLabelUtil.filterAttributesByPrefix(
nodeAttributes.keySet(), PREFIXES[0]);
Assert.assertEquals(3, filteredAttributes.size());
// Replace with prefix
// Different attribute names
toReplaceAttributes =
createAttributesForTest(NodeAttribute.PREFIX_DISTRIBUTED, 1,
"dist-node-attribute-v2", "dist_v3");
attributesManager.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
ImmutableMap.of(HOSTNAMES[0], toReplaceAttributes));
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(4, nodeAttributes.size());
clusterAttributes = attributesManager.getClusterNodeAttributes(
Sets.newHashSet(NodeAttribute.PREFIX_DISTRIBUTED));
Assert.assertEquals(1, clusterAttributes.size());
NodeAttribute attr = clusterAttributes.iterator().next();
Assert.assertEquals("dist-node-attribute-v2_0",
attr.getAttributeKey().getAttributeName());
Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
attr.getAttributeKey().getAttributePrefix());
Assert.assertEquals("dist_v3_0", attr.getAttributeValue());
// Replace all attributes
toReplaceMap.put(HOSTNAMES[0],
createAttributesForTest(PREFIXES[1], 2, "B", "B_v1"));
attributesManager.replaceNodeAttributes(null, toReplaceMap);
nodeAttributes = attributesManager.getAttributesForNode(HOSTNAMES[0]);
Assert.assertEquals(2, nodeAttributes.size());
clusterAttributes = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(PREFIXES[1]));
Assert.assertEquals(2, clusterAttributes.size());
clusterAttributes = attributesManager
.getClusterNodeAttributes(Sets.newHashSet(
NodeAttribute.PREFIX_DISTRIBUTED));
Assert.assertEquals(0, clusterAttributes.size());
}
}