| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.federation.store.impl; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Calendar; |
| import java.util.List; |
| import java.util.TimeZone; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; |
| import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; |
| import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; |
| import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; |
| import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; |
| import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; |
| import org.apache.hadoop.yarn.util.MonotonicClock; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Base class for FederationMembershipStateStore implementations. |
| */ |
| public abstract class FederationStateStoreBaseTest { |
| |
| private static final MonotonicClock CLOCK = new MonotonicClock(); |
| private FederationStateStore stateStore = createStateStore(); |
| |
| protected abstract FederationStateStore createStateStore(); |
| |
| private Configuration conf; |
| |
| @Before |
| public void before() throws IOException, YarnException { |
| stateStore.init(conf); |
| } |
| |
| @After |
| public void after() throws Exception { |
| stateStore.close(); |
| } |
| |
| // Test FederationMembershipStateStore |
| |
| @Test |
| public void testRegisterSubCluster() throws Exception { |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| |
| SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); |
| |
| long previousTimeStamp = |
| Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); |
| |
| SubClusterRegisterResponse result = stateStore.registerSubCluster( |
| SubClusterRegisterRequest.newInstance(subClusterInfo)); |
| |
| long currentTimeStamp = |
| Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); |
| |
| Assert.assertNotNull(result); |
| Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId)); |
| |
| // The saved heartbeat is between the old one and the current timestamp |
| Assert.assertTrue(querySubClusterInfo(subClusterId) |
| .getLastHeartBeat() <= currentTimeStamp); |
| Assert.assertTrue(querySubClusterInfo(subClusterId) |
| .getLastHeartBeat() >= previousTimeStamp); |
| } |
| |
| @Test |
| public void testDeregisterSubCluster() throws Exception { |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| registerSubCluster(createSubClusterInfo(subClusterId)); |
| |
| SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest |
| .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); |
| |
| stateStore.deregisterSubCluster(deregisterRequest); |
| |
| Assert.assertEquals(SubClusterState.SC_UNREGISTERED, |
| querySubClusterInfo(subClusterId).getState()); |
| } |
| |
| @Test |
| public void testDeregisterSubClusterUnknownSubCluster() throws Exception { |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| |
| SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest |
| .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); |
| try { |
| stateStore.deregisterSubCluster(deregisterRequest); |
| Assert.fail(); |
| } catch (FederationStateStoreException e) { |
| Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); |
| } |
| } |
| |
| @Test |
| public void testGetSubClusterInfo() throws Exception { |
| |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); |
| registerSubCluster(subClusterInfo); |
| |
| GetSubClusterInfoRequest request = |
| GetSubClusterInfoRequest.newInstance(subClusterId); |
| Assert.assertEquals(subClusterInfo, |
| stateStore.getSubCluster(request).getSubClusterInfo()); |
| } |
| |
| @Test |
| public void testGetSubClusterInfoUnknownSubCluster() throws Exception { |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| GetSubClusterInfoRequest request = |
| GetSubClusterInfoRequest.newInstance(subClusterId); |
| |
| GetSubClusterInfoResponse response = stateStore.getSubCluster(request); |
| Assert.assertNull(response); |
| } |
| |
| @Test |
| public void testGetAllSubClustersInfo() throws Exception { |
| |
| SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); |
| SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1); |
| |
| SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); |
| SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2); |
| |
| stateStore.registerSubCluster( |
| SubClusterRegisterRequest.newInstance(subClusterInfo1)); |
| stateStore.registerSubCluster( |
| SubClusterRegisterRequest.newInstance(subClusterInfo2)); |
| |
| stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest |
| .newInstance(subClusterId1, SubClusterState.SC_RUNNING, "capability")); |
| stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance( |
| subClusterId2, SubClusterState.SC_UNHEALTHY, "capability")); |
| |
| List<SubClusterInfo> subClustersActive = |
| stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) |
| .getSubClusters(); |
| List<SubClusterInfo> subClustersAll = |
| stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) |
| .getSubClusters(); |
| |
| // SC1 is the only active |
| Assert.assertEquals(1, subClustersActive.size()); |
| SubClusterInfo sc1 = subClustersActive.get(0); |
| Assert.assertEquals(subClusterId1, sc1.getSubClusterId()); |
| |
| // SC1 and SC2 are the SubCluster present into the StateStore |
| |
| Assert.assertEquals(2, subClustersAll.size()); |
| Assert.assertTrue(subClustersAll.contains(sc1)); |
| subClustersAll.remove(sc1); |
| SubClusterInfo sc2 = subClustersAll.get(0); |
| Assert.assertEquals(subClusterId2, sc2.getSubClusterId()); |
| } |
| |
| @Test |
| public void testSubClusterHeartbeat() throws Exception { |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| registerSubCluster(createSubClusterInfo(subClusterId)); |
| |
| long previousHeartBeat = |
| querySubClusterInfo(subClusterId).getLastHeartBeat(); |
| |
| SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest |
| .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability"); |
| stateStore.subClusterHeartbeat(heartbeatRequest); |
| |
| long currentTimeStamp = |
| Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); |
| |
| Assert.assertEquals(SubClusterState.SC_RUNNING, |
| querySubClusterInfo(subClusterId).getState()); |
| |
| // The saved heartbeat is between the old one and the current timestamp |
| Assert.assertTrue(querySubClusterInfo(subClusterId) |
| .getLastHeartBeat() <= currentTimeStamp); |
| Assert.assertTrue(querySubClusterInfo(subClusterId) |
| .getLastHeartBeat() >= previousHeartBeat); |
| } |
| |
| @Test |
| public void testSubClusterHeartbeatUnknownSubCluster() throws Exception { |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest |
| .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability"); |
| |
| try { |
| stateStore.subClusterHeartbeat(heartbeatRequest); |
| Assert.fail(); |
| } catch (FederationStateStoreException e) { |
| Assert.assertTrue(e.getMessage() |
| .startsWith("SubCluster SC does not exist; cannot heartbeat")); |
| } |
| } |
| |
| // Test FederationApplicationHomeSubClusterStore |
| |
| @Test |
| public void testAddApplicationHomeSubCluster() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| ApplicationHomeSubCluster ahsc = |
| ApplicationHomeSubCluster.newInstance(appId, subClusterId); |
| |
| AddApplicationHomeSubClusterRequest request = |
| AddApplicationHomeSubClusterRequest.newInstance(ahsc); |
| AddApplicationHomeSubClusterResponse response = |
| stateStore.addApplicationHomeSubCluster(request); |
| |
| Assert.assertEquals(subClusterId, response.getHomeSubCluster()); |
| Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId)); |
| |
| } |
| |
| @Test |
| public void testAddApplicationHomeSubClusterAppAlreadyExists() |
| throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); |
| addApplicationHomeSC(appId, subClusterId1); |
| |
| SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); |
| ApplicationHomeSubCluster ahsc2 = |
| ApplicationHomeSubCluster.newInstance(appId, subClusterId2); |
| |
| AddApplicationHomeSubClusterResponse response = |
| stateStore.addApplicationHomeSubCluster( |
| AddApplicationHomeSubClusterRequest.newInstance(ahsc2)); |
| |
| Assert.assertEquals(subClusterId1, response.getHomeSubCluster()); |
| Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId)); |
| |
| } |
| |
| @Test |
| public void testDeleteApplicationHomeSubCluster() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| addApplicationHomeSC(appId, subClusterId); |
| |
| DeleteApplicationHomeSubClusterRequest delRequest = |
| DeleteApplicationHomeSubClusterRequest.newInstance(appId); |
| |
| DeleteApplicationHomeSubClusterResponse response = |
| stateStore.deleteApplicationHomeSubCluster(delRequest); |
| |
| Assert.assertNotNull(response); |
| try { |
| queryApplicationHomeSC(appId); |
| Assert.fail(); |
| } catch (FederationStateStoreException e) { |
| Assert.assertTrue(e.getMessage() |
| .startsWith("Application " + appId + " does not exist")); |
| } |
| |
| } |
| |
| @Test |
| public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| DeleteApplicationHomeSubClusterRequest delRequest = |
| DeleteApplicationHomeSubClusterRequest.newInstance(appId); |
| |
| try { |
| stateStore.deleteApplicationHomeSubCluster(delRequest); |
| Assert.fail(); |
| } catch (FederationStateStoreException e) { |
| Assert.assertTrue(e.getMessage() |
| .startsWith("Application " + appId.toString() + " does not exist")); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationHomeSubCluster() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId = SubClusterId.newInstance("SC"); |
| addApplicationHomeSC(appId, subClusterId); |
| |
| GetApplicationHomeSubClusterRequest getRequest = |
| GetApplicationHomeSubClusterRequest.newInstance(appId); |
| |
| GetApplicationHomeSubClusterResponse result = |
| stateStore.getApplicationHomeSubCluster(getRequest); |
| |
| Assert.assertEquals(appId, |
| result.getApplicationHomeSubCluster().getApplicationId()); |
| Assert.assertEquals(subClusterId, |
| result.getApplicationHomeSubCluster().getHomeSubCluster()); |
| } |
| |
| @Test |
| public void testGetApplicationHomeSubClusterUnknownApp() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| GetApplicationHomeSubClusterRequest request = |
| GetApplicationHomeSubClusterRequest.newInstance(appId); |
| |
| try { |
| stateStore.getApplicationHomeSubCluster(request); |
| Assert.fail(); |
| } catch (FederationStateStoreException e) { |
| Assert.assertTrue(e.getMessage() |
| .startsWith("Application " + appId.toString() + " does not exist")); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationsHomeSubCluster() throws Exception { |
| ApplicationId appId1 = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); |
| ApplicationHomeSubCluster ahsc1 = |
| ApplicationHomeSubCluster.newInstance(appId1, subClusterId1); |
| |
| ApplicationId appId2 = ApplicationId.newInstance(1, 2); |
| SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); |
| ApplicationHomeSubCluster ahsc2 = |
| ApplicationHomeSubCluster.newInstance(appId2, subClusterId2); |
| |
| addApplicationHomeSC(appId1, subClusterId1); |
| addApplicationHomeSC(appId2, subClusterId2); |
| |
| GetApplicationsHomeSubClusterRequest getRequest = |
| GetApplicationsHomeSubClusterRequest.newInstance(); |
| |
| GetApplicationsHomeSubClusterResponse result = |
| stateStore.getApplicationsHomeSubCluster(getRequest); |
| |
| Assert.assertEquals(2, result.getAppsHomeSubClusters().size()); |
| Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1)); |
| Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2)); |
| } |
| |
| @Test |
| public void testUpdateApplicationHomeSubCluster() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); |
| addApplicationHomeSC(appId, subClusterId1); |
| |
| SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); |
| ApplicationHomeSubCluster ahscUpdate = |
| ApplicationHomeSubCluster.newInstance(appId, subClusterId2); |
| |
| UpdateApplicationHomeSubClusterRequest updateRequest = |
| UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate); |
| |
| UpdateApplicationHomeSubClusterResponse response = |
| stateStore.updateApplicationHomeSubCluster(updateRequest); |
| |
| Assert.assertNotNull(response); |
| Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId)); |
| } |
| |
| @Test |
| public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception { |
| ApplicationId appId = ApplicationId.newInstance(1, 1); |
| SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); |
| ApplicationHomeSubCluster ahsc = |
| ApplicationHomeSubCluster.newInstance(appId, subClusterId1); |
| |
| UpdateApplicationHomeSubClusterRequest updateRequest = |
| UpdateApplicationHomeSubClusterRequest.newInstance(ahsc); |
| |
| try { |
| stateStore.updateApplicationHomeSubCluster((updateRequest)); |
| Assert.fail(); |
| } catch (FederationStateStoreException e) { |
| Assert.assertTrue(e.getMessage() |
| .startsWith("Application " + appId.toString() + " does not exist")); |
| } |
| } |
| |
| // Test FederationPolicyStore |
| |
| @Test |
| public void testSetPolicyConfiguration() throws Exception { |
| SetSubClusterPolicyConfigurationRequest request = |
| SetSubClusterPolicyConfigurationRequest |
| .newInstance(createSCPolicyConf("Queue", "PolicyType")); |
| |
| SetSubClusterPolicyConfigurationResponse result = |
| stateStore.setPolicyConfiguration(request); |
| |
| Assert.assertNotNull(result); |
| Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"), |
| queryPolicy("Queue")); |
| |
| } |
| |
| @Test |
| public void testSetPolicyConfigurationUpdateExisting() throws Exception { |
| setPolicyConf("Queue", "PolicyType1"); |
| |
| SetSubClusterPolicyConfigurationRequest request2 = |
| SetSubClusterPolicyConfigurationRequest |
| .newInstance(createSCPolicyConf("Queue", "PolicyType2")); |
| SetSubClusterPolicyConfigurationResponse result = |
| stateStore.setPolicyConfiguration(request2); |
| |
| Assert.assertNotNull(result); |
| Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"), |
| queryPolicy("Queue")); |
| } |
| |
| @Test |
| public void testGetPolicyConfiguration() throws Exception { |
| setPolicyConf("Queue", "PolicyType"); |
| |
| GetSubClusterPolicyConfigurationRequest getRequest = |
| GetSubClusterPolicyConfigurationRequest.newInstance("Queue"); |
| GetSubClusterPolicyConfigurationResponse result = |
| stateStore.getPolicyConfiguration(getRequest); |
| |
| Assert.assertNotNull(result); |
| Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"), |
| result.getPolicyConfiguration()); |
| |
| } |
| |
| @Test |
| public void testGetPolicyConfigurationUnknownQueue() throws Exception { |
| |
| GetSubClusterPolicyConfigurationRequest request = |
| GetSubClusterPolicyConfigurationRequest.newInstance("Queue"); |
| |
| GetSubClusterPolicyConfigurationResponse response = |
| stateStore.getPolicyConfiguration(request); |
| Assert.assertNull(response); |
| } |
| |
| @Test |
| public void testGetPoliciesConfigurations() throws Exception { |
| setPolicyConf("Queue1", "PolicyType1"); |
| setPolicyConf("Queue2", "PolicyType2"); |
| |
| GetSubClusterPoliciesConfigurationsResponse response = |
| stateStore.getPoliciesConfigurations( |
| GetSubClusterPoliciesConfigurationsRequest.newInstance()); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(response.getPoliciesConfigs()); |
| |
| Assert.assertEquals(2, response.getPoliciesConfigs().size()); |
| |
| Assert.assertTrue(response.getPoliciesConfigs() |
| .contains(createSCPolicyConf("Queue1", "PolicyType1"))); |
| Assert.assertTrue(response.getPoliciesConfigs() |
| .contains(createSCPolicyConf("Queue2", "PolicyType2"))); |
| } |
| |
| // Convenience methods |
| |
| private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) { |
| |
| String amRMAddress = "1.2.3.4:1"; |
| String clientRMAddress = "1.2.3.4:2"; |
| String rmAdminAddress = "1.2.3.4:3"; |
| String webAppAddress = "1.2.3.4:4"; |
| |
| return SubClusterInfo.newInstance(subClusterId, amRMAddress, |
| clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, |
| CLOCK.getTime(), "capability"); |
| } |
| |
| private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, |
| String policyType) { |
| ByteBuffer bb = ByteBuffer.allocate(100); |
| bb.put((byte) 0x02); |
| return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb); |
| } |
| |
| private void addApplicationHomeSC(ApplicationId appId, |
| SubClusterId subClusterId) throws YarnException { |
| ApplicationHomeSubCluster ahsc = |
| ApplicationHomeSubCluster.newInstance(appId, subClusterId); |
| AddApplicationHomeSubClusterRequest request = |
| AddApplicationHomeSubClusterRequest.newInstance(ahsc); |
| stateStore.addApplicationHomeSubCluster(request); |
| } |
| |
| private void setPolicyConf(String queue, String policyType) |
| throws YarnException { |
| SetSubClusterPolicyConfigurationRequest request = |
| SetSubClusterPolicyConfigurationRequest |
| .newInstance(createSCPolicyConf(queue, policyType)); |
| stateStore.setPolicyConfiguration(request); |
| } |
| |
| private void registerSubCluster(SubClusterInfo subClusterInfo) |
| throws YarnException { |
| stateStore.registerSubCluster( |
| SubClusterRegisterRequest.newInstance(subClusterInfo)); |
| } |
| |
| private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId) |
| throws YarnException { |
| GetSubClusterInfoRequest request = |
| GetSubClusterInfoRequest.newInstance(subClusterId); |
| return stateStore.getSubCluster(request).getSubClusterInfo(); |
| } |
| |
| private SubClusterId queryApplicationHomeSC(ApplicationId appId) |
| throws YarnException { |
| GetApplicationHomeSubClusterRequest request = |
| GetApplicationHomeSubClusterRequest.newInstance(appId); |
| |
| GetApplicationHomeSubClusterResponse response = |
| stateStore.getApplicationHomeSubCluster(request); |
| |
| return response.getApplicationHomeSubCluster().getHomeSubCluster(); |
| } |
| |
| private SubClusterPolicyConfiguration queryPolicy(String queue) |
| throws YarnException { |
| GetSubClusterPolicyConfigurationRequest request = |
| GetSubClusterPolicyConfigurationRequest.newInstance(queue); |
| |
| GetSubClusterPolicyConfigurationResponse result = |
| stateStore.getPolicyConfiguration(request); |
| return result.getPolicyConfiguration(); |
| } |
| |
| protected void setConf(Configuration conf) { |
| this.conf = conf; |
| } |
| |
| protected Configuration getConf() { |
| return conf; |
| } |
| |
| } |