blob: 258762682dfb8f4da53496b95786a7fa492c423d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Base class for FederationMembershipStateStore implementations.
*/
public abstract class FederationStateStoreBaseTest {
private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationStateStore stateStore;
protected abstract FederationStateStore createStateStore();
private Configuration conf;
@Before
public void before() throws IOException, YarnException {
stateStore = createStateStore();
stateStore.init(conf);
}
@After
public void after() throws Exception {
stateStore.close();
}
// Test FederationMembershipStateStore
@Test
public void testRegisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
long previousTimeStamp =
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
SubClusterRegisterResponse result = stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
long currentTimeStamp =
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
Assert.assertNotNull(result);
Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
// The saved heartbeat is between the old one and the current timestamp
Assert.assertTrue(querySubClusterInfo(subClusterId)
.getLastHeartBeat() <= currentTimeStamp);
Assert.assertTrue(querySubClusterInfo(subClusterId)
.getLastHeartBeat() >= previousTimeStamp);
}
@Test
public void testDeregisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
registerSubCluster(createSubClusterInfo(subClusterId));
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
stateStore.deregisterSubCluster(deregisterRequest);
Assert.assertEquals(SubClusterState.SC_UNREGISTERED,
querySubClusterInfo(subClusterId).getState());
}
@Test
public void testDeregisterSubClusterUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
try {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
}
}
@Test
public void testGetSubClusterInfo() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
registerSubCluster(subClusterInfo);
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
Assert.assertEquals(subClusterInfo,
stateStore.getSubCluster(request).getSubClusterInfo());
}
@Test
public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
Assert.assertNull(response);
}
@Test
public void testGetAllSubClustersInfo() throws Exception {
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo1));
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo2));
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
.newInstance(subClusterId1, SubClusterState.SC_RUNNING, "capability"));
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
List<SubClusterInfo> subClustersActive =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
.getSubClusters();
List<SubClusterInfo> subClustersAll =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
.getSubClusters();
// SC1 is the only active
Assert.assertEquals(1, subClustersActive.size());
SubClusterInfo sc1 = subClustersActive.get(0);
Assert.assertEquals(subClusterId1, sc1.getSubClusterId());
// SC1 and SC2 are the SubCluster present into the StateStore
Assert.assertEquals(2, subClustersAll.size());
Assert.assertTrue(subClustersAll.contains(sc1));
subClustersAll.remove(sc1);
SubClusterInfo sc2 = subClustersAll.get(0);
Assert.assertEquals(subClusterId2, sc2.getSubClusterId());
}
@Test
public void testSubClusterHeartbeat() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
registerSubCluster(createSubClusterInfo(subClusterId));
long previousHeartBeat =
querySubClusterInfo(subClusterId).getLastHeartBeat();
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
stateStore.subClusterHeartbeat(heartbeatRequest);
long currentTimeStamp =
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
Assert.assertEquals(SubClusterState.SC_RUNNING,
querySubClusterInfo(subClusterId).getState());
// The saved heartbeat is between the old one and the current timestamp
Assert.assertTrue(querySubClusterInfo(subClusterId)
.getLastHeartBeat() <= currentTimeStamp);
Assert.assertTrue(querySubClusterInfo(subClusterId)
.getLastHeartBeat() >= previousHeartBeat);
}
@Test
public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage()
.startsWith("SubCluster SC does not exist; cannot heartbeat"));
}
}
// Test FederationApplicationHomeSubClusterStore
@Test
public void testAddApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubCluster(request);
Assert.assertEquals(subClusterId, response.getHomeSubCluster());
Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
}
@Test
public void testAddApplicationHomeSubClusterAppAlreadyExists()
throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
Assert.assertEquals(subClusterId1, response.getHomeSubCluster());
Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
}
@Test
public void testAddApplicationHomeSubClusterAppAlreadyExistsInTheSameSC()
throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId1);
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
Assert.assertEquals(subClusterId1, response.getHomeSubCluster());
Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
}
@Test
public void testDeleteApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
addApplicationHomeSC(appId, subClusterId);
DeleteApplicationHomeSubClusterRequest delRequest =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
DeleteApplicationHomeSubClusterResponse response =
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.assertNotNull(response);
try {
queryApplicationHomeSC(appId);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId + " does not exist"));
}
}
@Test
public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
DeleteApplicationHomeSubClusterRequest delRequest =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
try {
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
}
}
@Test
public void testGetApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
addApplicationHomeSC(appId, subClusterId);
GetApplicationHomeSubClusterRequest getRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse result =
stateStore.getApplicationHomeSubCluster(getRequest);
Assert.assertEquals(appId,
result.getApplicationHomeSubCluster().getApplicationId());
Assert.assertEquals(subClusterId,
result.getApplicationHomeSubCluster().getHomeSubCluster());
}
@Test
public void testGetApplicationHomeSubClusterUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
try {
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
}
}
@Test
public void testGetApplicationsHomeSubCluster() throws Exception {
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ApplicationHomeSubCluster ahsc1 =
ApplicationHomeSubCluster.newInstance(appId1, subClusterId1);
ApplicationId appId2 = ApplicationId.newInstance(1, 2);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ApplicationHomeSubCluster ahsc2 =
ApplicationHomeSubCluster.newInstance(appId2, subClusterId2);
addApplicationHomeSC(appId1, subClusterId1);
addApplicationHomeSC(appId2, subClusterId2);
GetApplicationsHomeSubClusterRequest getRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse result =
stateStore.getApplicationsHomeSubCluster(getRequest);
Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
}
@Test
public void testUpdateApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addApplicationHomeSC(appId, subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ApplicationHomeSubCluster ahscUpdate =
ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
UpdateApplicationHomeSubClusterRequest updateRequest =
UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate);
UpdateApplicationHomeSubClusterResponse response =
stateStore.updateApplicationHomeSubCluster(updateRequest);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId));
}
@Test
public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
UpdateApplicationHomeSubClusterRequest updateRequest =
UpdateApplicationHomeSubClusterRequest.newInstance(ahsc);
try {
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Application " + appId.toString() + " does not exist"));
}
}
// Test FederationPolicyStore
@Test
public void testSetPolicyConfiguration() throws Exception {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf("Queue", "PolicyType"));
SetSubClusterPolicyConfigurationResponse result =
stateStore.setPolicyConfiguration(request);
Assert.assertNotNull(result);
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
queryPolicy("Queue"));
}
@Test
public void testSetPolicyConfigurationUpdateExisting() throws Exception {
setPolicyConf("Queue", "PolicyType1");
SetSubClusterPolicyConfigurationRequest request2 =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf("Queue", "PolicyType2"));
SetSubClusterPolicyConfigurationResponse result =
stateStore.setPolicyConfiguration(request2);
Assert.assertNotNull(result);
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"),
queryPolicy("Queue"));
}
@Test
public void testGetPolicyConfiguration() throws Exception {
setPolicyConf("Queue", "PolicyType");
GetSubClusterPolicyConfigurationRequest getRequest =
GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
GetSubClusterPolicyConfigurationResponse result =
stateStore.getPolicyConfiguration(getRequest);
Assert.assertNotNull(result);
Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
result.getPolicyConfiguration());
}
@Test
public void testGetPolicyConfigurationUnknownQueue() throws Exception {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
GetSubClusterPolicyConfigurationResponse response =
stateStore.getPolicyConfiguration(request);
Assert.assertNull(response);
}
@Test
public void testGetPoliciesConfigurations() throws Exception {
setPolicyConf("Queue1", "PolicyType1");
setPolicyConf("Queue2", "PolicyType2");
GetSubClusterPoliciesConfigurationsResponse response =
stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest.newInstance());
Assert.assertNotNull(response);
Assert.assertNotNull(response.getPoliciesConfigs());
Assert.assertEquals(2, response.getPoliciesConfigs().size());
Assert.assertTrue(response.getPoliciesConfigs()
.contains(createSCPolicyConf("Queue1", "PolicyType1")));
Assert.assertTrue(response.getPoliciesConfigs()
.contains(createSCPolicyConf("Queue2", "PolicyType2")));
}
// Convenience methods
SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
String clientRMAddress = "1.2.3.4:2";
String rmAdminAddress = "1.2.3.4:3";
String webAppAddress = "1.2.3.4:4";
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
CLOCK.getTime(), "capability");
}
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
ByteBuffer bb = ByteBuffer.allocate(100);
bb.put((byte) 0x02);
return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
}
void addApplicationHomeSC(ApplicationId appId,
SubClusterId subClusterId) throws YarnException {
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
stateStore.addApplicationHomeSubCluster(request);
}
private void setPolicyConf(String queue, String policyType)
throws YarnException {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf(queue, policyType));
stateStore.setPolicyConfiguration(request);
}
private void registerSubCluster(SubClusterInfo subClusterInfo)
throws YarnException {
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
return stateStore.getSubCluster(request).getSubClusterInfo();
}
SubClusterId queryApplicationHomeSC(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
private SubClusterPolicyConfiguration queryPolicy(String queue)
throws YarnException {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
GetSubClusterPolicyConfigurationResponse result =
stateStore.getPolicyConfiguration(request);
return result.getPolicyConfiguration();
}
protected void setConf(Configuration conf) {
this.conf = conf;
}
protected Configuration getConf() {
return conf;
}
protected FederationStateStore getStateStore() {
return stateStore;
}
SubClusterId queryReservationHomeSC(ReservationId reservationId)
throws YarnException {
GetReservationHomeSubClusterRequest request =
GetReservationHomeSubClusterRequest.newInstance(reservationId);
GetReservationHomeSubClusterResponse response =
stateStore.getReservationHomeSubCluster(request);
return response.getReservationHomeSubCluster().getHomeSubCluster();
}
@Test
public void testAddReservationHomeSubCluster() throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId = SubClusterId.newInstance("SC");
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
AddReservationHomeSubClusterRequest request =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
AddReservationHomeSubClusterResponse response =
stateStore.addReservationHomeSubCluster(request);
Assert.assertEquals(subClusterId, response.getHomeSubCluster());
Assert.assertEquals(subClusterId, queryReservationHomeSC(reservationId));
}
private void addReservationHomeSC(ReservationId reservationId, SubClusterId subClusterId)
throws YarnException {
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
AddReservationHomeSubClusterRequest request =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
stateStore.addReservationHomeSubCluster(request);
}
@Test
public void testAddReservationHomeSubClusterReservationAlreadyExists() throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addReservationHomeSC(reservationId, subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ReservationHomeSubCluster reservationHomeSubCluster2 =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId2);
AddReservationHomeSubClusterRequest request2 =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster2);
AddReservationHomeSubClusterResponse response =
stateStore.addReservationHomeSubCluster(request2);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId1, response.getHomeSubCluster());
Assert.assertEquals(subClusterId1, queryReservationHomeSC(reservationId));
}
@Test
public void testAddReservationHomeSubClusterAppAlreadyExistsInTheSameSC()
throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
addReservationHomeSC(reservationId, subClusterId1);
ReservationHomeSubCluster reservationHomeSubCluster2 =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId1);
AddReservationHomeSubClusterRequest request2 =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster2);
AddReservationHomeSubClusterResponse response =
stateStore.addReservationHomeSubCluster(request2);
Assert.assertNotNull(response);
Assert.assertEquals(subClusterId1, response.getHomeSubCluster());
Assert.assertEquals(subClusterId1, queryReservationHomeSC(reservationId));
}
@Test
public void testDeleteReservationHomeSubCluster() throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC");
addReservationHomeSC(reservationId, subClusterId1);
DeleteReservationHomeSubClusterRequest delReservationRequest =
DeleteReservationHomeSubClusterRequest.newInstance(reservationId);
DeleteReservationHomeSubClusterResponse delReservationResponse =
stateStore.deleteReservationHomeSubCluster(delReservationRequest);
Assert.assertNotNull(delReservationResponse);
LambdaTestUtils.intercept(YarnException.class,
"Reservation " + reservationId + " does not exist",
() -> queryReservationHomeSC(reservationId));
}
@Test
public void testDeleteReservationHomeSubClusterUnknownApp() throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
DeleteReservationHomeSubClusterRequest delReservationRequest =
DeleteReservationHomeSubClusterRequest.newInstance(reservationId);
LambdaTestUtils.intercept(YarnException.class,
"Reservation " + reservationId + " does not exist",
() -> stateStore.deleteReservationHomeSubCluster(delReservationRequest));
}
@Test
public void testUpdateReservationHomeSubCluster() throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC");
addReservationHomeSC(reservationId, subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId2);
UpdateReservationHomeSubClusterRequest updateReservationRequest =
UpdateReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
UpdateReservationHomeSubClusterResponse updateReservationResponse =
stateStore.updateReservationHomeSubCluster(updateReservationRequest);
Assert.assertNotNull(updateReservationResponse);
Assert.assertEquals(subClusterId2, queryReservationHomeSC(reservationId));
}
@Test
public void testUpdateReservationHomeSubClusterUnknownApp() throws Exception {
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId1);
UpdateReservationHomeSubClusterRequest updateReservationRequest =
UpdateReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
LambdaTestUtils.intercept(YarnException.class,
"Reservation " + reservationId + " does not exist",
() -> stateStore.updateReservationHomeSubCluster(updateReservationRequest));
}
@Test
public void testStoreNewMasterKey() throws Exception {
// store delegation key;
DelegationKey key = new DelegationKey(1234, 4321, "keyBytes".getBytes());
Set<DelegationKey> keySet = new HashSet<>();
keySet.add(key);
RouterMasterKey routerMasterKey = RouterMasterKey.newInstance(key.getKeyId(),
ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
RouterMasterKeyRequest routerMasterKeyRequest =
RouterMasterKeyRequest.newInstance(routerMasterKey);
RouterMasterKeyResponse response = stateStore.storeNewMasterKey(routerMasterKeyRequest);
Assert.assertNotNull(response);
RouterMasterKey routerMasterKeyResp = response.getRouterMasterKey();
Assert.assertNotNull(routerMasterKeyResp);
Assert.assertEquals(routerMasterKey.getKeyId(), routerMasterKeyResp.getKeyId());
Assert.assertEquals(routerMasterKey.getKeyBytes(), routerMasterKeyResp.getKeyBytes());
Assert.assertEquals(routerMasterKey.getExpiryDate(), routerMasterKeyResp.getExpiryDate());
}
@Test
public void testGetMasterKeyByDelegationKey() throws YarnException, IOException {
// store delegation key;
DelegationKey key = new DelegationKey(5678, 8765, "keyBytes".getBytes());
Set<DelegationKey> keySet = new HashSet<>();
keySet.add(key);
RouterMasterKey routerMasterKey = RouterMasterKey.newInstance(key.getKeyId(),
ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
RouterMasterKeyRequest routerMasterKeyRequest =
RouterMasterKeyRequest.newInstance(routerMasterKey);
RouterMasterKeyResponse response = stateStore.storeNewMasterKey(routerMasterKeyRequest);
Assert.assertNotNull(response);
RouterMasterKeyResponse routerMasterKeyResponse =
stateStore.getMasterKeyByDelegationKey(routerMasterKeyRequest);
Assert.assertNotNull(routerMasterKeyResponse);
RouterMasterKey routerMasterKeyResp = routerMasterKeyResponse.getRouterMasterKey();
Assert.assertNotNull(routerMasterKeyResp);
Assert.assertEquals(routerMasterKey.getKeyId(), routerMasterKeyResp.getKeyId());
Assert.assertEquals(routerMasterKey.getKeyBytes(), routerMasterKeyResp.getKeyBytes());
Assert.assertEquals(routerMasterKey.getExpiryDate(), routerMasterKeyResp.getExpiryDate());
}
@Test
public void testRemoveStoredMasterKey() throws YarnException, IOException {
// store delegation key;
DelegationKey key = new DelegationKey(1234, 4321, "keyBytes".getBytes());
Set<DelegationKey> keySet = new HashSet<>();
keySet.add(key);
RouterMasterKey routerMasterKey = RouterMasterKey.newInstance(key.getKeyId(),
ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
RouterMasterKeyRequest routerMasterKeyRequest =
RouterMasterKeyRequest.newInstance(routerMasterKey);
RouterMasterKeyResponse response = stateStore.storeNewMasterKey(routerMasterKeyRequest);
Assert.assertNotNull(response);
RouterMasterKeyResponse masterKeyResponse =
stateStore.removeStoredMasterKey(routerMasterKeyRequest);
Assert.assertNotNull(masterKeyResponse);
RouterMasterKey routerMasterKeyResp = masterKeyResponse.getRouterMasterKey();
Assert.assertEquals(routerMasterKey.getKeyId(), routerMasterKeyResp.getKeyId());
Assert.assertEquals(routerMasterKey.getKeyBytes(), routerMasterKeyResp.getKeyBytes());
Assert.assertEquals(routerMasterKey.getExpiryDate(), routerMasterKeyResp.getExpiryDate());
}
}