| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.ha.HAServiceProtocol; |
| import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; |
| import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.GroupMappingServiceProvider; |
| import org.apache.hadoop.security.Groups; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.security.authorize.ProxyUsers; |
| import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; |
| import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; |
| import org.apache.hadoop.yarn.api.records.DecommissionType; |
| import org.apache.hadoop.yarn.api.records.NodeAttribute; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeType; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeLabel; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.conf.ConfigurationProvider; |
| import org.apache.hadoop.yarn.conf.HAUtil; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX; |
| import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.NODES; |
| import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.PREFIX; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; |
| import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; |
| |
| import static org.junit.Assert.assertTrue; |
| |
| public class TestRMAdminService { |
| |
| private Configuration configuration; |
| private MockRM rm = null; |
| private FileSystem fs; |
| private Path workingPath; |
| private Path tmpDir; |
| |
| static { |
| YarnConfiguration.addDefaultResource( |
| YarnConfiguration.CS_CONFIGURATION_FILE); |
| YarnConfiguration.addDefaultResource( |
| YarnConfiguration.DR_CONFIGURATION_FILE); |
| } |
| |
| @Before |
| public void setup() throws IOException { |
| QueueMetrics.clearQueueMetrics(); |
| DefaultMetricsSystem.setMiniClusterMode(true); |
| |
| configuration = new YarnConfiguration(); |
| configuration.set(YarnConfiguration.RM_SCHEDULER, |
| CapacityScheduler.class.getCanonicalName()); |
| fs = FileSystem.get(configuration); |
| workingPath = |
| new Path(new File("target", this.getClass().getSimpleName() |
| + "-remoteDir").getAbsolutePath()); |
| configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE, |
| workingPath.toString()); |
| tmpDir = new Path(new File("target", this.getClass().getSimpleName() |
| + "-tmpDir").getAbsolutePath()); |
| fs.delete(workingPath, true); |
| fs.delete(tmpDir, true); |
| fs.mkdirs(workingPath); |
| fs.mkdirs(tmpDir); |
| |
| // reset the groups to what it default test settings |
| MockUnixGroupsMapping.resetGroups(); |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| if (rm != null) { |
| rm.stop(); |
| } |
| fs.delete(workingPath, true); |
| fs.delete(tmpDir, true); |
| } |
| |
| @Test |
| public void testAdminRefreshQueuesWithLocalConfigurationProvider() |
| throws IOException, YarnException { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| |
| CapacityScheduler cs = |
| (CapacityScheduler) rm.getRMContext().getScheduler(); |
| int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); |
| |
| try { |
| rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance()); |
| Assert.assertEquals(maxAppsBefore, cs.getConfiguration() |
| .getMaximumSystemApplications()); |
| } catch (Exception ex) { |
| fail("Using localConfigurationProvider. Should not get any exception."); |
| } |
| } |
| |
| |
| @Test |
| public void testFileSystemCloseWithFileSystemBasedConfigurationProvider() |
| throws Exception { |
| MiniDFSCluster hdfsCluster = null; |
| try { |
| HdfsConfiguration hdfsConfig = new HdfsConfiguration(); |
| hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig) |
| .numDataNodes(1).build(); |
| FileSystem fs1 = hdfsCluster.getFileSystem(); |
| ConfigurationProvider configurationProvider = new |
| FileSystemBasedConfigurationProvider(); |
| configurationProvider.init(hdfsConfig); |
| fs1.close(); |
| try { |
| configurationProvider.getConfigurationInputStream(hdfsConfig, |
| "yarn-site.xml"); |
| } catch (IOException e) { |
| if (e.getMessage().contains("Filesystem closed")) { |
| fail("FileSystemBasedConfigurationProvider failed to handle " + |
| "FileSystem close"); |
| } else { |
| fail("Should not get any exceptions"); |
| } |
| } |
| } finally { |
| if (hdfsCluster != null) { |
| hdfsCluster.shutdown(); |
| } |
| } |
| } |
| |
| |
| @Test |
| public void testAdminRefreshQueuesWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| CapacityScheduler cs = |
| (CapacityScheduler) rm.getRMContext().getScheduler(); |
| int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications(); |
| |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.set(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, |
| "5000"); |
| uploadConfiguration(csConf, "capacity-scheduler.xml"); |
| |
| rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance()); |
| |
| int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); |
| Assert.assertEquals(maxAppsAfter, 5000); |
| Assert.assertTrue(maxAppsAfter != maxAppsBefore); |
| } |
| |
| @Test |
| public void testAdminRefreshQueuesWithMutableSchedulerConfiguration() { |
| configuration.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, |
| YarnConfiguration.MEMORY_CONFIGURATION_STORE); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| try { |
| rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance()); |
| fail("Expected exception while calling refreshQueues when scheduler" + |
| " configuration is mutable."); |
| } catch (Exception ex) { |
| assertTrue(ex.getMessage().endsWith("Scheduler configuration is " + |
| "mutable. refreshQueues is not allowed in this scenario.")); |
| } |
| } |
| |
| @Test |
| public void testAdminRefreshNodesWithoutConfiguration() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| // upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| try { |
| rm.adminService |
| .refreshNodesResources(RefreshNodesResourcesRequest.newInstance()); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions even when no configurations " |
| + "are done for node resources refresh"); |
| } |
| } |
| |
| @Test |
| public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| rm.registerNode("h1:1234", 5120); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| NodeId nid = NodeId.fromString("h1:1234"); |
| RMNode ni = rm.getRMContext().getRMNodes().get(nid); |
| Resource resource = ni.getTotalCapability(); |
| Assert.assertEquals("<memory:5120, vCores:5>", resource.toString()); |
| |
| DynamicResourceConfiguration drConf = |
| new DynamicResourceConfiguration(); |
| drConf.set(PREFIX + NODES, "h1:1234"); |
| drConf.set(PREFIX + "h1:1234.vcores", "4"); |
| drConf.set(PREFIX + "h1:1234.memory", "4096"); |
| uploadConfiguration(drConf, "dynamic-resources.xml"); |
| |
| rm.adminService.refreshNodesResources( |
| RefreshNodesResourcesRequest.newInstance()); |
| rm.drainEvents(); |
| |
| RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); |
| Resource resourceAfter = niAfter.getTotalCapability(); |
| Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); |
| } |
| |
| @Test |
| public void testRefreshNodesResourceWithResourceReturnInRegistration() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| MockNM nm = null; |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| nm = rm.registerNode("h1:1234", 2048, 2); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| NodeId nid = NodeId.fromString("h1:1234"); |
| RMNode ni = rm.getRMContext().getRMNodes().get(nid); |
| Resource resource = ni.getTotalCapability(); |
| Assert.assertEquals("<memory:2048, vCores:2>", resource.toString()); |
| |
| DynamicResourceConfiguration drConf = |
| new DynamicResourceConfiguration(); |
| drConf.set(PREFIX + NODES, "h1:1234"); |
| drConf.set(PREFIX + "h1:1234.vcores", "4"); |
| drConf.set(PREFIX + "h1:1234.memory", "4096"); |
| uploadConfiguration(drConf, "dynamic-resources.xml"); |
| |
| rm.adminService.refreshNodesResources( |
| RefreshNodesResourcesRequest.newInstance()); |
| |
| try { |
| // register the same node again with original resource. |
| // validate this will get new resource back; |
| nm.registerNode(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); |
| Resource resourceAfter = niAfter.getTotalCapability(); |
| Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); |
| |
| Assert.assertEquals(4096, nm.getMemory()); |
| Assert.assertEquals(4, nm.getvCores()); |
| } |
| |
| @Test |
| public void testRefreshNodesResourceWithResourceReturnInHeartbeat() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| MockNM nm = null; |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| nm = rm.registerNode("h1:1234", 2048, 2); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| NodeId nid = NodeId.fromString("h1:1234"); |
| RMNode ni = rm.getRMContext().getRMNodes().get(nid); |
| Resource resource = ni.getTotalCapability(); |
| Assert.assertEquals("<memory:2048, vCores:2>", resource.toString()); |
| |
| DynamicResourceConfiguration drConf = |
| new DynamicResourceConfiguration(); |
| drConf.set(PREFIX + NODES, "h1:1234"); |
| drConf.set(PREFIX + "h1:1234.vcores", "4"); |
| drConf.set(PREFIX + "h1:1234.memory", "4096"); |
| uploadConfiguration(drConf, "dynamic-resources.xml"); |
| |
| rm.adminService.refreshNodesResources( |
| RefreshNodesResourcesRequest.newInstance()); |
| |
| try { |
| // NM-RM heartbeat, validate that this will get new resource back. |
| nm.nodeHeartbeat(true); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); |
| Resource resourceAfter = niAfter.getTotalCapability(); |
| Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); |
| |
| Assert.assertEquals(4096, nm.getMemory()); |
| Assert.assertEquals(4, nm.getvCores()); |
| } |
| |
| @Test |
| public void testResourcePersistentForNMRegistrationWithNewResource() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| rm.registerNode("h1:1234", 5120); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| NodeId nid = NodeId.fromString("h1:1234"); |
| RMNode ni = rm.getRMContext().getRMNodes().get(nid); |
| Resource resource = ni.getTotalCapability(); |
| Assert.assertEquals("<memory:5120, vCores:5>", resource.toString()); |
| |
| DynamicResourceConfiguration drConf = |
| new DynamicResourceConfiguration(); |
| drConf.set(PREFIX + NODES, "h1:1234"); |
| drConf.set(PREFIX + "h1:1234.vcores", "4"); |
| drConf.set(PREFIX + "h1:1234.memory", "4096"); |
| uploadConfiguration(drConf, "dynamic-resources.xml"); |
| |
| rm.adminService.refreshNodesResources( |
| RefreshNodesResourcesRequest.newInstance()); |
| |
| try { |
| // register the same node again with a different resource. |
| // validate this won't work as resource cached in RM side. |
| rm.registerNode("h1:1234", 8192, 8); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); |
| Resource resourceAfter = niAfter.getTotalCapability(); |
| Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); |
| |
| // Replace original dr file with an empty dr file, and validate node |
| // registration with new resources will take effective now. |
| deleteOnRemoteFileSystem("dynamic-resources.xml"); |
| DynamicResourceConfiguration emptyDRConf = |
| new DynamicResourceConfiguration(); |
| |
| uploadConfiguration(emptyDRConf, "dynamic-resources.xml"); |
| rm.adminService.refreshNodesResources( |
| RefreshNodesResourcesRequest.newInstance()); |
| try { |
| // register the same node third time, this time the register resource |
| // should work. |
| rm.registerNode("h1:1234", 8192, 8); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| niAfter = rm.getRMContext().getRMNodes().get(nid); |
| resourceAfter = niAfter.getTotalCapability(); |
| // new resource in registration should take effective as we empty |
| // dynamic resource file already. |
| Assert.assertEquals("<memory:8192, vCores:8>", resourceAfter.toString()); |
| } |
| |
| @Test |
| public void testAdminAclsWithLocalConfigurationProvider() { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| |
| try { |
| rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance()); |
| } catch (Exception ex) { |
| fail("Using localConfigurationProvider. Should not get any exception."); |
| } |
| } |
| |
| @Test |
| public void testAdminAclsWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| String aclStringBefore = |
| rm.adminService.getAccessControlList().getAclString().trim(); |
| |
| YarnConfiguration yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, "world:anyone:rwcda"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance()); |
| |
| String aclStringAfter = |
| rm.adminService.getAccessControlList().getAclString().trim(); |
| |
| Assert.assertTrue(!aclStringAfter.equals(aclStringBefore)); |
| Assert.assertEquals(aclStringAfter, "world:anyone:rwcda," + |
| UserGroupInformation.getCurrentUser().getShortUserName()); |
| } |
| |
| @Test |
| public void testServiceAclsRefreshWithLocalConfigurationProvider() { |
| configuration.setBoolean( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true); |
| ResourceManager resourceManager = null; |
| |
| try { |
| resourceManager = new ResourceManager(); |
| resourceManager.init(configuration); |
| resourceManager.start(); |
| resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest |
| .newInstance()); |
| } catch (Exception ex) { |
| fail("Using localConfigurationProvider. Should not get any exception."); |
| } finally { |
| if (resourceManager != null) { |
| resourceManager.stop(); |
| } |
| } |
| } |
| |
| @Test |
| public void testServiceAclsRefreshWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.setBoolean( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true); |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| ResourceManager resourceManager = null; |
| try { |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| Configuration conf = new Configuration(); |
| conf.setBoolean( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true); |
| uploadConfiguration(conf, "core-site.xml"); |
| try { |
| resourceManager = new ResourceManager(); |
| resourceManager.init(configuration); |
| resourceManager.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| String aclsString = "alice,bob users,wheel"; |
| Configuration newConf = new Configuration(); |
| newConf.set("security.applicationclient.protocol.acl", aclsString); |
| uploadConfiguration(newConf, "hadoop-policy.xml"); |
| |
| resourceManager.adminService.refreshServiceAcls(RefreshServiceAclsRequest |
| .newInstance()); |
| |
| // verify service Acls refresh for AdminService |
| ServiceAuthorizationManager adminServiceServiceManager = |
| resourceManager.adminService.getServer() |
| .getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(adminServiceServiceManager, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify service ACLs refresh for ClientRMService |
| ServiceAuthorizationManager clientRMServiceServiceManager = |
| resourceManager.getRMContext().getClientRMService().getServer() |
| .getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(clientRMServiceServiceManager, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify service ACLs refresh for ApplicationMasterService |
| ServiceAuthorizationManager appMasterService = |
| resourceManager.getRMContext().getApplicationMasterService() |
| .getServer().getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(appMasterService, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify service ACLs refresh for ResourceTrackerService |
| ServiceAuthorizationManager RTService = |
| resourceManager.getRMContext().getResourceTrackerService() |
| .getServer().getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(RTService, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| } finally { |
| if (resourceManager != null) { |
| resourceManager.stop(); |
| } |
| } |
| } |
| |
| private void verifyServiceACLsRefresh(ServiceAuthorizationManager manager, |
| Class<?> protocol, String aclString) { |
| for (Class<?> protocolClass : manager.getProtocolsWithAcls()) { |
| AccessControlList accessList = |
| manager.getProtocolsAcls(protocolClass); |
| if (protocolClass == protocol) { |
| Assert.assertEquals(accessList.getAclString(), |
| aclString); |
| } else { |
| assertThat(accessList.getAclString()).isEqualTo("*"); |
| } |
| } |
| } |
| |
| @Test |
| public void |
| testRefreshSuperUserGroupsWithLocalConfigurationProvider() { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| |
| try { |
| rm.adminService.refreshSuperUserGroupsConfiguration( |
| RefreshSuperUserGroupsConfigurationRequest.newInstance()); |
| } catch (Exception ex) { |
| fail("Using localConfigurationProvider. Should not get any exception."); |
| } |
| } |
| |
| @Test |
| public void |
| testRefreshSuperUserGroupsWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| //upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| Configuration coreConf = new Configuration(false); |
| coreConf.set("hadoop.proxyuser.test.groups", "test_groups"); |
| coreConf.set("hadoop.proxyuser.test.hosts", "test_hosts"); |
| uploadConfiguration(coreConf, "core-site.xml"); |
| |
| rm.adminService.refreshSuperUserGroupsConfiguration( |
| RefreshSuperUserGroupsConfigurationRequest.newInstance()); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() |
| .get("hadoop.proxyuser.test.groups").size() == 1); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() |
| .get("hadoop.proxyuser.test.groups").contains("test_groups")); |
| |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() |
| .get("hadoop.proxyuser.test.hosts").size() == 1); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() |
| .get("hadoop.proxyuser.test.hosts").contains("test_hosts")); |
| |
| Configuration yarnConf = new Configuration(false); |
| yarnConf.set(RM_PROXY_USER_PREFIX + "test.groups", "test_groups_1"); |
| yarnConf.set(RM_PROXY_USER_PREFIX + "test.hosts", "test_hosts_1"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| // RM specific configs will overwrite the common ones |
| rm.adminService.refreshSuperUserGroupsConfiguration( |
| RefreshSuperUserGroupsConfigurationRequest.newInstance()); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() |
| .get("hadoop.proxyuser.test.groups").size() == 1); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() |
| .get("hadoop.proxyuser.test.groups").contains("test_groups_1")); |
| |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() |
| .get("hadoop.proxyuser.test.hosts").size() == 1); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() |
| .get("hadoop.proxyuser.test.hosts").contains("test_hosts_1")); |
| } |
| |
| @Test |
| public void testRefreshUserToGroupsMappingsWithLocalConfigurationProvider() { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| try { |
| rm.adminService |
| .refreshUserToGroupsMappings(RefreshUserToGroupsMappingsRequest |
| .newInstance()); |
| } catch (Exception ex) { |
| fail("Using localConfigurationProvider. Should not get any exception."); |
| } |
| } |
| |
| @Test |
| public void |
| testRefreshUserToGroupsMappingsWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| String[] defaultTestUserGroups = {"dummy_group1", "dummy_group2"}; |
| UserGroupInformation ugi = UserGroupInformation.createUserForTesting |
| ("dummyUser", defaultTestUserGroups); |
| |
| String user = ugi.getUserName(); |
| List<String> groupWithInit = new ArrayList<String>(2); |
| for(int i = 0; i < ugi.getGroupNames().length; i++ ) { |
| groupWithInit.add(ugi.getGroupNames()[i]); |
| } |
| |
| // upload default configurations |
| uploadDefaultConfiguration(); |
| Configuration conf = new Configuration(); |
| conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, |
| MockUnixGroupsMapping.class, |
| GroupMappingServiceProvider.class); |
| uploadConfiguration(conf, "core-site.xml"); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| // Make sure RM will use the updated GroupMappingServiceProvider |
| Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(conf).refresh(); |
| List<String> groupBefore = |
| new ArrayList<String>(Groups.getUserToGroupsMappingService( |
| configuration).getGroups(user)); |
| Assert.assertTrue(groupBefore.contains("test_group_A") |
| && groupBefore.contains("test_group_B") |
| && groupBefore.contains("test_group_C") && groupBefore.size() == 3); |
| Assert.assertTrue(groupWithInit.size() != groupBefore.size()); |
| Assert.assertFalse(groupWithInit.contains("test_group_A") |
| || groupWithInit.contains("test_group_B") |
| || groupWithInit.contains("test_group_C")); |
| |
| // update the groups |
| MockUnixGroupsMapping.updateGroups(); |
| |
| rm.adminService |
| .refreshUserToGroupsMappings(RefreshUserToGroupsMappingsRequest |
| .newInstance()); |
| List<String> groupAfter = |
| Groups.getUserToGroupsMappingService(configuration).getGroups(user); |
| |
| // should get the updated groups |
| Assert.assertTrue(groupAfter.contains("test_group_D") |
| && groupAfter.contains("test_group_E") |
| && groupAfter.contains("test_group_F") && groupAfter.size() == 3); |
| |
| } |
| |
| @Test |
| public void testRefreshNodesWithLocalConfigurationProvider() { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| |
| try { |
| rm.adminService.refreshNodes(RefreshNodesRequest |
| .newInstance(DecommissionType.NORMAL)); |
| } catch (Exception ex) { |
| fail("Using localConfigurationProvider. Should not get any exception."); |
| } |
| } |
| |
| @Test |
| public void testRefreshNodesWithFileSystemBasedConfigurationProvider() |
| throws IOException, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| // upload default configurations |
| uploadDefaultConfiguration(); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| final File excludeHostsFile = new File(tmpDir.toString(), "excludeHosts"); |
| if (excludeHostsFile.exists()) { |
| excludeHostsFile.delete(); |
| } |
| if (!excludeHostsFile.createNewFile()) { |
| Assert.fail("Can not create " + "excludeHosts"); |
| } |
| PrintWriter fileWriter = new PrintWriter(excludeHostsFile); |
| fileWriter.write("0.0.0.0:123"); |
| fileWriter.close(); |
| |
| uploadToRemoteFileSystem(new Path(excludeHostsFile.getAbsolutePath())); |
| |
| Configuration yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, this.workingPath |
| + "/excludeHosts"); |
| uploadConfiguration(yarnConf, YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); |
| |
| rm.adminService.refreshNodes(RefreshNodesRequest |
| .newInstance(DecommissionType.NORMAL)); |
| Set<String> excludeHosts = |
| rm.getNodesListManager().getHostsReader().getExcludedHosts(); |
| Assert.assertTrue(excludeHosts.size() == 1); |
| Assert.assertTrue(excludeHosts.contains("0.0.0.0:123")); |
| } |
| |
| @Test |
| public void testRMHAWithFileSystemBasedConfiguration() throws IOException, |
| YarnException { |
| StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( |
| HAServiceProtocol.RequestSource.REQUEST_BY_USER); |
| updateConfigurationForRMHA(); |
| Configuration conf1 = new Configuration(configuration); |
| conf1.set(YarnConfiguration.RM_HA_ID, "rm1"); |
| Configuration conf2 = new Configuration(configuration); |
| conf2.set(YarnConfiguration.RM_HA_ID, "rm2"); |
| |
| // upload default configurations |
| uploadDefaultConfiguration(); |
| |
| MockRM rm1 = null; |
| MockRM rm2 = null; |
| try { |
| rm1 = new MockRM(conf1); |
| rm1.init(conf1); |
| rm1.start(); |
| Assert.assertTrue(rm1.getRMContext().getHAServiceState() |
| == HAServiceState.STANDBY); |
| |
| rm2 = new MockRM(conf2); |
| rm2.init(conf1); |
| rm2.start(); |
| Assert.assertTrue(rm2.getRMContext().getHAServiceState() |
| == HAServiceState.STANDBY); |
| |
| rm1.adminService.transitionToActive(requestInfo); |
| Assert.assertTrue(rm1.getRMContext().getHAServiceState() |
| == HAServiceState.ACTIVE); |
| |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.set(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, |
| "5000"); |
| uploadConfiguration(csConf, "capacity-scheduler.xml"); |
| |
| rm1.adminService.refreshQueues(RefreshQueuesRequest.newInstance()); |
| |
| int maxApps = |
| ((CapacityScheduler) rm1.getRMContext().getScheduler()) |
| .getConfiguration().getMaximumSystemApplications(); |
| Assert.assertEquals(maxApps, 5000); |
| |
| // Before failover happens, the maxApps is |
| // still the default value on the standby rm : rm2 |
| int maxAppsBeforeFailOver = |
| ((CapacityScheduler) rm2.getRMContext().getScheduler()) |
| .getConfiguration().getMaximumSystemApplications(); |
| Assert.assertEquals(maxAppsBeforeFailOver, 10000); |
| |
| // Do the failover |
| rm1.adminService.transitionToStandby(requestInfo); |
| rm2.adminService.transitionToActive(requestInfo); |
| Assert.assertTrue(rm1.getRMContext().getHAServiceState() |
| == HAServiceState.STANDBY); |
| Assert.assertTrue(rm2.getRMContext().getHAServiceState() |
| == HAServiceState.ACTIVE); |
| |
| int maxAppsAfter = |
| ((CapacityScheduler) rm2.getRMContext().getScheduler()) |
| .getConfiguration().getMaximumSystemApplications(); |
| |
| Assert.assertEquals(maxAppsAfter, 5000); |
| } finally { |
| if (rm1 != null) { |
| rm1.stop(); |
| } |
| if (rm2 != null) { |
| rm2.stop(); |
| } |
| } |
| } |
| |
| /** |
| * Test that a configuration with no leader election configured fails. |
| */ |
| @Test |
| public void testHAConfWithoutLeaderElection() { |
| Configuration conf = new Configuration(configuration); |
| |
| conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, false); |
| conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, false); |
| conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); |
| |
| int base = 100; |
| |
| for (String confKey : |
| YarnConfiguration.getServiceAddressConfKeys(configuration)) { |
| conf.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" |
| + (base + 20)); |
| conf.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:" |
| + (base + 40)); |
| base = base * 2; |
| } |
| |
| conf.set(YarnConfiguration.RM_HA_ID, "rm1"); |
| |
| checkBadConfiguration(conf); |
| } |
| |
| /** |
| * Test that a configuration with a single RM fails. |
| */ |
| @Test |
| public void testHAConfWithSingleRMID() { |
| Configuration conf = new Configuration(configuration); |
| |
| conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, true); |
| conf.set(YarnConfiguration.RM_HA_IDS, "rm1"); |
| |
| int base = 100; |
| |
| for (String confKey : |
| YarnConfiguration.getServiceAddressConfKeys(configuration)) { |
| conf.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:" |
| + (base + 20)); |
| base = base * 2; |
| } |
| |
| conf.set(YarnConfiguration.RM_HA_ID, "rm1"); |
| |
| checkBadConfiguration(conf); |
| } |
| |
| /** |
| * Test that a configuration with no service information fails. |
| */ |
| @Test |
| public void testHAConfWithoutServiceInfo() { |
| Configuration conf = new Configuration(configuration); |
| |
| conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); |
| conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, true); |
| conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); |
| |
| conf.set(YarnConfiguration.RM_HA_ID, "rm1"); |
| |
| checkBadConfiguration(conf); |
| } |
| |
| @Test |
| public void testRMStartsWithoutConfigurationFilesProvided() { |
| // enable FileSystemBasedConfigurationProvider without uploading |
| // any configuration files into Remote File System. |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| // The configurationProvider will return NULL instead of |
| // throwing out Exceptions, if there are no configuration files provided. |
| // RM will not load the remote Configuration files, |
| // and should start successfully. |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| } |
| |
| @Test |
| public void testRMInitialsWithFileSystemBasedConfigurationProvider() |
| throws Exception { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| // upload configurations |
| final File excludeHostsFile = new File(tmpDir.toString(), "excludeHosts"); |
| if (excludeHostsFile.exists()) { |
| excludeHostsFile.delete(); |
| } |
| if (!excludeHostsFile.createNewFile()) { |
| Assert.fail("Can not create " + "excludeHosts"); |
| } |
| PrintWriter fileWriter = new PrintWriter(excludeHostsFile); |
| fileWriter.write("0.0.0.0:123"); |
| fileWriter.close(); |
| uploadToRemoteFileSystem(new Path(excludeHostsFile.getAbsolutePath())); |
| |
| YarnConfiguration yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, "world:anyone:rwcda"); |
| yarnConf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, this.workingPath |
| + "/excludeHosts"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| csConf.set(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, |
| "5000"); |
| uploadConfiguration(csConf, "capacity-scheduler.xml"); |
| |
| String aclsString = "alice,bob users,wheel"; |
| Configuration newConf = new Configuration(); |
| newConf.set("security.applicationclient.protocol.acl", aclsString); |
| uploadConfiguration(newConf, "hadoop-policy.xml"); |
| |
| Configuration conf = new Configuration(); |
| conf.setBoolean( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true); |
| conf.set("hadoop.proxyuser.test.groups", "test_groups"); |
| conf.set("hadoop.proxyuser.test.hosts", "test_hosts"); |
| conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, |
| MockUnixGroupsMapping.class, |
| GroupMappingServiceProvider.class); |
| uploadConfiguration(conf, "core-site.xml"); |
| |
| // update the groups |
| MockUnixGroupsMapping.updateGroups(); |
| |
| ResourceManager resourceManager = null; |
| try { |
| try { |
| resourceManager = new ResourceManager(); |
| resourceManager.init(configuration); |
| resourceManager.start(); |
| } catch (Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| // validate values for excludeHosts |
| Set<String> excludeHosts = |
| resourceManager.getRMContext().getNodesListManager() |
| .getHostsReader().getExcludedHosts(); |
| Assert.assertTrue(excludeHosts.size() == 1); |
| Assert.assertTrue(excludeHosts.contains("0.0.0.0:123")); |
| |
| // validate values for admin-acls |
| String aclStringAfter = |
| resourceManager.adminService.getAccessControlList() |
| .getAclString().trim(); |
| Assert.assertEquals(aclStringAfter, "world:anyone:rwcda," + |
| UserGroupInformation.getCurrentUser().getShortUserName()); |
| |
| // validate values for queue configuration |
| CapacityScheduler cs = |
| (CapacityScheduler) resourceManager.getRMContext().getScheduler(); |
| int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications(); |
| Assert.assertEquals(maxAppsAfter, 5000); |
| |
| // verify service Acls for AdminService |
| ServiceAuthorizationManager adminServiceServiceManager = |
| resourceManager.adminService.getServer() |
| .getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(adminServiceServiceManager, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify service ACLs for ClientRMService |
| ServiceAuthorizationManager clientRMServiceServiceManager = |
| resourceManager.getRMContext().getClientRMService().getServer() |
| .getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(clientRMServiceServiceManager, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify service ACLs for ApplicationMasterService |
| ServiceAuthorizationManager appMasterService = |
| resourceManager.getRMContext().getApplicationMasterService() |
| .getServer().getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(appMasterService, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify service ACLs for ResourceTrackerService |
| ServiceAuthorizationManager RTService = |
| resourceManager.getRMContext().getResourceTrackerService() |
| .getServer().getServiceAuthorizationManager(); |
| verifyServiceACLsRefresh(RTService, |
| org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.class, |
| aclsString); |
| |
| // verify ProxyUsers and ProxyHosts |
| ProxyUsers.refreshSuperUserGroupsConfiguration(configuration); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() |
| .get("hadoop.proxyuser.test.groups").size() == 1); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyGroups() |
| .get("hadoop.proxyuser.test.groups").contains("test_groups")); |
| |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() |
| .get("hadoop.proxyuser.test.hosts").size() == 1); |
| Assert.assertTrue(ProxyUsers.getDefaultImpersonationProvider().getProxyHosts() |
| .get("hadoop.proxyuser.test.hosts").contains("test_hosts")); |
| |
| // verify UserToGroupsMappings |
| Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(conf).refresh(); |
| List<String> groupAfter = |
| Groups.getUserToGroupsMappingService(configuration).getGroups( |
| UserGroupInformation.getCurrentUser().getUserName()); |
| Assert.assertTrue(groupAfter.contains("test_group_D") |
| && groupAfter.contains("test_group_E") |
| && groupAfter.contains("test_group_F") && groupAfter.size() == 3); |
| } finally { |
| if (resourceManager != null) { |
| resourceManager.stop(); |
| } |
| } |
| } |
| |
| /* For verifying fix for YARN-3804 */ |
| @Test |
| public void testRefreshAclWithDaemonUser() throws Exception { |
| String daemonUser = |
| UserGroupInformation.getCurrentUser().getShortUserName(); |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| uploadDefaultConfiguration(); |
| YarnConfiguration yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, daemonUser + "xyz"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| try { |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| } catch(Exception ex) { |
| fail("Should not get any exceptions"); |
| } |
| |
| assertEquals(daemonUser + "xyz," + daemonUser, |
| rm.adminService.getAccessControlList().getAclString().trim()); |
| |
| yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.YARN_ADMIN_ACL, daemonUser + "abc"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| try { |
| rm.adminService.refreshAdminAcls(RefreshAdminAclsRequest.newInstance()); |
| } catch (YarnException e) { |
| if (e.getCause() != null && |
| e.getCause() instanceof AccessControlException) { |
| fail("Refresh should not have failed due to incorrect ACL"); |
| } |
| throw e; |
| } |
| |
| assertEquals(daemonUser + "abc," + daemonUser, |
| rm.adminService.getAccessControlList().getAclString().trim()); |
| } |
| |
| @Test |
| public void testModifyLabelsOnNodesWithDistributedConfigurationDisabled() |
| throws IOException, YarnException { |
| // create RM and set it's ACTIVE |
| MockRM rm = new MockRM(); |
| ((RMContextImpl) rm.getRMContext()) |
| .setHAServiceState(HAServiceState.ACTIVE); |
| RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); |
| |
| // by default, distributed configuration for node label is disabled, this |
| // should pass |
| labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); |
| rm.adminService.replaceLabelsOnNode(ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host", 0), |
| (Set<String>) ImmutableSet.of("x")))); |
| rm.close(); |
| } |
| |
| @Test(expected = YarnException.class) |
| public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled() |
| throws IOException, YarnException { |
| // create RM and set it's ACTIVE, and set distributed node label |
| // configuration to true |
| MockRM rm = new MockRM(); |
| rm.adminService.isCentralizedNodeLabelConfiguration = false; |
| |
| ((RMContextImpl) rm.getRMContext()) |
| .setHAServiceState(HAServiceState.ACTIVE); |
| RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); |
| |
| // by default, distributed configuration for node label is disabled, this |
| // should pass |
| labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); |
| rm.adminService.replaceLabelsOnNode(ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host", 0), |
| (Set<String>) ImmutableSet.of("x")))); |
| rm.close(); |
| } |
| |
| @Test |
| public void testModifyLabelsOnUnknownNodes() throws IOException, |
| YarnException { |
| // create RM and set it's ACTIVE, and set distributed node label |
| // configuration to true |
| rm = new MockRM(); |
| |
| ((RMContextImpl) rm.getRMContext()) |
| .setHAServiceState(HAServiceState.ACTIVE); |
| setActiveAndInactiveNodes(rm); |
| RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); |
| |
| // by default, distributed configuration for node label is disabled, this |
| // should pass |
| labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", |
| "y")); |
| // replace known node |
| ReplaceLabelsOnNodeRequest request1 = ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host1", 1111), |
| (Set<String>) ImmutableSet.of("x"))); |
| request1.setFailOnUnknownNodes(true); |
| try { |
| rm.adminService.replaceLabelsOnNode(request1); |
| } catch (Exception ex) { |
| fail("should not fail on known node"); |
| } |
| |
| // replace known node with wildcard port |
| ReplaceLabelsOnNodeRequest request2 = ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host1", 0), |
| (Set<String>) ImmutableSet.of("x"))); |
| request2.setFailOnUnknownNodes(true); |
| try { |
| rm.adminService.replaceLabelsOnNode(request2); |
| } catch (Exception ex) { |
| fail("should not fail on known node"); |
| } |
| |
| // replace unknown node |
| ReplaceLabelsOnNodeRequest request3 = ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0), |
| (Set<String>) ImmutableSet.of("x"))); |
| request3.setFailOnUnknownNodes(true); |
| try { |
| rm.adminService.replaceLabelsOnNode(request3); |
| fail("Should fail on unknown node"); |
| } catch (Exception ex) { |
| } |
| |
| // replace known node but wrong port |
| ReplaceLabelsOnNodeRequest request4 = ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host2", 1111), |
| (Set<String>) ImmutableSet.of("x"))); |
| request4.setFailOnUnknownNodes(true); |
| try { |
| rm.adminService.replaceLabelsOnNode(request4); |
| fail("Should fail on node with wrong port"); |
| } catch (Exception ex) { |
| } |
| |
| // replace non-exist node but not check |
| ReplaceLabelsOnNodeRequest request5 = ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0), |
| (Set<String>) ImmutableSet.of("x"))); |
| request5.setFailOnUnknownNodes(false); |
| try { |
| rm.adminService.replaceLabelsOnNode(request5); |
| } catch (Exception ex) { |
| fail("Should not fail on unknown node when " |
| + "fail-on-unkown-nodes is set false"); |
| } |
| |
| // replace on inactive node |
| ReplaceLabelsOnNodeRequest request6 = ReplaceLabelsOnNodeRequest |
| .newInstance(ImmutableMap.of(NodeId.newInstance("host4", 0), |
| (Set<String>) ImmutableSet.of("x"))); |
| request6.setFailOnUnknownNodes(true); |
| try { |
| rm.adminService.replaceLabelsOnNode(request6); |
| } catch (Exception ex) { |
| fail("should not fail on inactive node"); |
| } |
| |
| rm.close(); |
| } |
| |
| @Test |
| public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled() |
| throws IOException, YarnException { |
| // create RM and set it's ACTIVE |
| MockRM rm = new MockRM(); |
| ((RMContextImpl) rm.getRMContext()) |
| .setHAServiceState(HAServiceState.ACTIVE); |
| RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); |
| rm.adminService.isCentralizedNodeLabelConfiguration = false; |
| |
| // by default, distributed configuration for node label is disabled, this |
| // should pass |
| labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); |
| rm.adminService |
| .removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequest |
| .newInstance((Set<String>) ImmutableSet.of("x"))); |
| |
| Set<String> clusterNodeLabels = labelMgr.getClusterNodeLabelNames(); |
| assertEquals(1,clusterNodeLabels.size()); |
| rm.close(); |
| } |
| |
| @Test(timeout = 30000) |
| public void testAdminRefreshClusterMaxPriority() throws Exception, |
| YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| uploadDefaultConfiguration(); |
| YarnConfiguration yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, "5"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| rm = new MockRM(configuration); |
| rm.init(configuration); |
| rm.start(); |
| |
| CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); |
| Assert.assertEquals(5, cs.getMaxClusterLevelAppPriority().getPriority()); |
| |
| yarnConf = new YarnConfiguration(); |
| yarnConf |
| .set(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, "10"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| try { |
| rm.adminService |
| .refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest |
| .newInstance()); |
| Assert.assertEquals(10, cs.getMaxClusterLevelAppPriority().getPriority()); |
| } catch (Exception ex) { |
| fail("Could not refresh cluster max priority."); |
| } |
| } |
| |
| private String writeConfigurationXML(Configuration conf, String confXMLName) |
| throws IOException { |
| DataOutputStream output = null; |
| try { |
| final File confFile = new File(tmpDir.toString(), confXMLName); |
| if (confFile.exists()) { |
| confFile.delete(); |
| } |
| if (!confFile.createNewFile()) { |
| Assert.fail("Can not create " + confXMLName); |
| } |
| output = new DataOutputStream( |
| new FileOutputStream(confFile)); |
| conf.writeXml(output); |
| return confFile.getAbsolutePath(); |
| } finally { |
| if (output != null) { |
| output.close(); |
| } |
| } |
| } |
| |
| private void uploadToRemoteFileSystem(Path filePath) |
| throws IOException { |
| fs.copyFromLocalFile(filePath, workingPath); |
| } |
| |
| private void uploadConfiguration(Configuration conf, String confFileName) |
| throws IOException { |
| String csConfFile = writeConfigurationXML(conf, confFileName); |
| // upload the file into Remote File System |
| uploadToRemoteFileSystem(new Path(csConfFile)); |
| } |
| |
| private void deleteOnRemoteFileSystem(String fileName) |
| throws IOException { |
| fs.delete(new Path(workingPath, fileName)); |
| } |
| |
| private void uploadDefaultConfiguration() throws IOException { |
| Configuration conf = new Configuration(); |
| uploadConfiguration(conf, "core-site.xml"); |
| |
| YarnConfiguration yarnConf = new YarnConfiguration(); |
| yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| uploadConfiguration(yarnConf, "yarn-site.xml"); |
| |
| CapacitySchedulerConfiguration csConf = |
| new CapacitySchedulerConfiguration(); |
| uploadConfiguration(csConf, "capacity-scheduler.xml"); |
| |
| Configuration hadoopPolicyConf = new Configuration(false); |
| hadoopPolicyConf |
| .addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); |
| uploadConfiguration(hadoopPolicyConf, "hadoop-policy.xml"); |
| } |
| |
| private static class MockUnixGroupsMapping implements |
| GroupMappingServiceProvider { |
| |
| private static List<String> group = new ArrayList<String>(); |
| |
| @Override |
| public List<String> getGroups(String user) throws IOException { |
| return group; |
| } |
| |
| @Override |
| public void cacheGroupsRefresh() throws IOException { |
| // Do nothing |
| } |
| |
| @Override |
| public void cacheGroupsAdd(List<String> groups) throws IOException { |
| // Do nothing |
| } |
| |
| @Override |
| public Set<String> getGroupsSet(String user) throws IOException { |
| return ImmutableSet.copyOf(group); |
| } |
| |
| public static void updateGroups() { |
| group.clear(); |
| group.add("test_group_D"); |
| group.add("test_group_E"); |
| group.add("test_group_F"); |
| } |
| |
| public static void resetGroups() { |
| group.clear(); |
| group.add("test_group_A"); |
| group.add("test_group_B"); |
| group.add("test_group_C"); |
| } |
| } |
| |
| @Test |
| public void testSecureRMBecomeActive() throws IOException, |
| YarnException { |
| StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( |
| HAServiceProtocol.RequestSource.REQUEST_BY_USER); |
| updateConfigurationForRMHA(); |
| configuration.setBoolean( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true); |
| configuration.set(YarnConfiguration.RM_HA_ID, "rm1"); |
| // upload default configurations |
| uploadDefaultConfiguration(); |
| |
| ResourceManager resourceManager = new ResourceManager(); |
| try { |
| resourceManager.init(configuration); |
| resourceManager.start(); |
| Assert.assertTrue(resourceManager.getRMContext().getHAServiceState() |
| == HAServiceState.STANDBY); |
| resourceManager.adminService.transitionToActive(requestInfo); |
| } finally { |
| resourceManager.close(); |
| } |
| } |
| |
| private void updateConfigurationForRMHA() { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); |
| configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); |
| configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); |
| |
| int base = 1500; |
| for (String confKey : YarnConfiguration |
| .getServiceAddressConfKeys(configuration)) { |
| configuration.set(HAUtil.addSuffix(confKey, "rm1"), |
| "0.0.0.0:" + (base + 20)); |
| configuration.set(HAUtil.addSuffix(confKey, "rm2"), |
| "0.0.0.1:" + (base + 40)); |
| base = base * 2; |
| } |
| } |
| |
| /** |
| * This method initializes an RM with the given configuration and expects it |
| * to fail with a configuration error. |
| * |
| * @param conf the {@link Configuration} to use |
| */ |
| private void checkBadConfiguration(Configuration conf) { |
| MockRM rm1 = null; |
| |
| conf.set(YarnConfiguration.RM_HA_ID, "rm1"); |
| |
| try { |
| rm1 = new MockRM(conf); |
| rm1.init(conf); |
| fail("The RM allowed an invalid configuration"); |
| } catch (YarnRuntimeException e) { |
| assertTrue("The RM initialization threw an unexpected exception", |
| e.getMessage().startsWith(HAUtil.BAD_CONFIG_MESSAGE_PREFIX)); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testAdminAddToClusterNodeLabelsWithDeprecatedAPIs() |
| throws Exception, YarnException { |
| configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, |
| "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); |
| |
| uploadDefaultConfiguration(); |
| |
| rm = new MockRM(configuration) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, |
| this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| rm.init(configuration); |
| rm.start(); |
| |
| try { |
| List<String> list = new ArrayList<String>(); |
| list.add("a"); |
| list.add("b"); |
| AddToClusterNodeLabelsRequestProto proto = AddToClusterNodeLabelsRequestProto |
| .newBuilder().addAllDeprecatedNodeLabels(list).build(); |
| AddToClusterNodeLabelsRequestPBImpl protoImpl = new AddToClusterNodeLabelsRequestPBImpl( |
| proto); |
| rm.adminService |
| .addToClusterNodeLabels((AddToClusterNodeLabelsRequest) protoImpl); |
| } catch (Exception ex) { |
| fail("Could not update node labels." + ex); |
| } |
| |
| // Create a client. |
| Configuration conf = new Configuration(); |
| YarnRPC rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| ApplicationClientProtocol client = (ApplicationClientProtocol) rpc |
| .getProxy(ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Get node labels collection |
| GetClusterNodeLabelsResponse response = client |
| .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); |
| NodeLabel labelX = NodeLabel.newInstance("a"); |
| NodeLabel labelY = NodeLabel.newInstance("b"); |
| Assert.assertTrue( |
| response.getNodeLabelList().containsAll(Arrays.asList(labelX, labelY))); |
| } |
| |
| @Test(timeout = 30000) |
| public void testMapAttributesToNodes() throws Exception, YarnException { |
| // 1. Need to test for the Invalid Node |
| // 1.1. Need to test for active nodes |
| // 1.2. Need to test for Inactive nodes |
| // 1.3. Test with Single Node invalid |
| // 1.4. Need to test with port (should fail) |
| // 1.5. Test with unknown node when failOnUnknownNodes is false |
| |
| // also test : 3. Ensure Appropriate manager Method call is done |
| Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null); |
| rm = new MockRM(newConf); |
| |
| NodeAttributesManager spiedAttributesManager = |
| Mockito.spy(rm.getRMContext().getNodeAttributesManager()); |
| rm.getRMContext().setNodeAttributesManager(spiedAttributesManager); |
| |
| ((RMContextImpl) rm.getRMContext()) |
| .setHAServiceState(HAServiceState.ACTIVE); |
| setActiveAndInactiveNodes(rm); |
| // by default, distributed configuration for node label is disabled, this |
| // should pass |
| NodesToAttributesMappingRequest request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.ADD, |
| ImmutableList.of(NodeToAttributes.newInstance("host1", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| true); |
| try { |
| rm.adminService.mapAttributesToNodes(request); |
| } catch (Exception ex) { |
| fail("should not fail on known node in active state" + ex.getMessage()); |
| } |
| Mockito.verify(spiedAttributesManager, Mockito.times(1)) |
| .addNodeAttributes(Mockito.anyMap()); |
| |
| // add attributes for later removals |
| request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.ADD, |
| ImmutableList.of(NodeToAttributes.newInstance("host4", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| true); |
| rm.adminService.mapAttributesToNodes(request); |
| request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.REMOVE, |
| ImmutableList.of(NodeToAttributes.newInstance("host4", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| true); |
| try { |
| rm.adminService.mapAttributesToNodes(request); |
| } catch (Exception ex) { |
| fail("should not fail on known node in inactive state" + ex.getMessage()); |
| } |
| Mockito.verify(spiedAttributesManager, Mockito.times(1)) |
| .removeNodeAttributes(Mockito.anyMap()); |
| |
| // Assert node to attributes mappings are empty. |
| Assert.assertTrue("Attributes of host4 should be empty", |
| rm.getRMContext().getNodeAttributesManager() |
| .getAttributesForNode("host4").isEmpty()); |
| // remove non existing attributes. |
| request = NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.REMOVE, ImmutableList |
| .of(NodeToAttributes.newInstance("host4", ImmutableList |
| .of(NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), true); |
| try { |
| rm.adminService.mapAttributesToNodes(request); |
| fail("Should have failed for non exists attribute"); |
| } catch (Exception ex) { |
| assertTrue("Exception expected if attributes does not exist", true); |
| } |
| |
| request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.ADD, |
| ImmutableList.of(NodeToAttributes.newInstance("host5", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| true); |
| try { |
| rm.adminService.mapAttributesToNodes(request); |
| fail("host5 is not a valid node, It should have failed"); |
| } catch (YarnException ex) { |
| Assert.assertEquals("Exception Message is not as desired", |
| " Following nodes does not exist : [host5]", |
| ex.getCause().getMessage()); |
| } |
| |
| request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.ADD, ImmutableList.of( |
| NodeToAttributes.newInstance("host4:8889", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf"))), |
| NodeToAttributes.newInstance("host2:8889", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| true); |
| try { |
| // port if added in CLI it fails in the client itself. Here we just check |
| // against hostname hence the message as : nodes does not exist. |
| rm.adminService.mapAttributesToNodes(request); |
| fail("host with the port should fail as only hostnames are validated"); |
| } catch (YarnException ex) { |
| Assert.assertEquals("Exception Message is not as desired", |
| " Following nodes does not exist : [host4:8889, host2:8889]", |
| ex.getCause().getMessage()); |
| } |
| |
| request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.REPLACE, |
| ImmutableList.of(NodeToAttributes.newInstance("host5", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_CENTRALIZED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| false); |
| try { |
| rm.adminService.mapAttributesToNodes(request); |
| } catch (Exception ex) { |
| fail("This operation should not fail as failOnUnknownNodes is false : " |
| + ex.getMessage()); |
| } |
| Mockito.verify(spiedAttributesManager, Mockito.times(1)) |
| .replaceNodeAttributes(Mockito.eq(NodeAttribute.PREFIX_CENTRALIZED), |
| Mockito.anyMap()); |
| |
| // 2. fail on invalid prefix |
| request = |
| NodesToAttributesMappingRequest |
| .newInstance(AttributeMappingOperationType.ADD, |
| ImmutableList.of(NodeToAttributes.newInstance("host5", |
| ImmutableList.of(NodeAttribute.newInstance( |
| NodeAttribute.PREFIX_DISTRIBUTED, "x", |
| NodeAttributeType.STRING, "dfasdf")))), |
| false); |
| try { |
| rm.adminService.mapAttributesToNodes(request); |
| fail("This operation should fail as prefix should be \"nm.yarn.io\"."); |
| } catch (YarnException ex) { |
| Assert.assertEquals("Exception Message is not as desired", |
| "Invalid Attribute Mapping for the node host5. Prefix should be " |
| + "rm.yarn.io", ex.getCause().getMessage()); |
| } |
| |
| rm.close(); |
| } |
| |
| private void setActiveAndInactiveNodes(ResourceManager resourceManager) { |
| Map<NodeId, RMNode> rmNodes = resourceManager.getRMContext().getRMNodes(); |
| rmNodes.put(NodeId.newInstance("host1", 1111), new RMNodeImpl(null, |
| resourceManager.getRMContext(), "host1", 0, 0, null, null, null)); |
| rmNodes.put(NodeId.newInstance("host2", 2222), new RMNodeImpl(null, |
| resourceManager.getRMContext(), "host2", 0, 0, null, null, null)); |
| rmNodes.put(NodeId.newInstance("host3", 3333), new RMNodeImpl(null, |
| resourceManager.getRMContext(), "host3", 0, 0, null, null, null)); |
| Map<NodeId, RMNode> rmInactiveNodes = |
| resourceManager.getRMContext().getInactiveRMNodes(); |
| rmInactiveNodes.put(NodeId.newInstance("host4", 4444), new RMNodeImpl(null, |
| resourceManager.getRMContext(), "host4", 0, 0, null, null, null)); |
| } |
| } |