blob: e5e156dcf76c63b1ad0d78129bf4a5404806ad74 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.federation;
import java.io.IOException;
import java.io.StringReader;
import java.net.UnknownHostException;
import javax.xml.bind.JAXBException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONUnmarshaller;
/**
* Unit tests for FederationStateStoreService.
*/
public class TestFederationRMStateStoreService {
private final HAServiceProtocol.StateChangeRequestInfo requestInfo =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
private final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
private final GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
private Configuration conf;
private FederationStateStore stateStore;
private long lastHearbeatTS = 0;
private JSONJAXBContext jc;
private JSONUnmarshaller unmarshaller;
@Before
public void setUp() throws IOException, YarnException, JAXBException {
conf = new YarnConfiguration();
jc = new JSONJAXBContext(
JSONConfiguration.mapped().rootUnwrapping(false).build(),
ClusterMetricsInfo.class);
unmarshaller = jc.createJSONUnmarshaller();
}
@After
public void tearDown() throws Exception {
unmarshaller = null;
jc = null;
}
@Test
public void testFederationStateStoreService() throws Exception {
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
final MockRM rm = new MockRM(conf);
// Initially there should be no entry for the sub-cluster
rm.init(conf);
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
Assert.assertNull(response);
// Validate if sub-cluster is registered
rm.start();
String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
Assert.assertTrue(capability.isEmpty());
// Heartbeat to see if sub-cluster transitions to running
FederationStateStoreHeartbeat storeHeartbeat =
rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
storeHeartbeat.run();
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
checkClusterMetricsInfo(capability, 0);
// heartbeat again after adding a node.
rm.registerNode("127.0.0.1:1234", 4 * 1024);
storeHeartbeat.run();
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
checkClusterMetricsInfo(capability, 1);
// Validate sub-cluster deregistration
rm.getFederationStateStoreService()
.deregisterSubCluster(SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
checkSubClusterInfo(SubClusterState.SC_UNREGISTERED);
// check after failover
explicitFailover(rm);
capability = checkSubClusterInfo(SubClusterState.SC_NEW);
Assert.assertTrue(capability.isEmpty());
// Heartbeat to see if sub-cluster transitions to running
storeHeartbeat =
rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
storeHeartbeat.run();
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
checkClusterMetricsInfo(capability, 0);
// heartbeat again after adding a node.
rm.registerNode("127.0.0.1:1234", 4 * 1024);
storeHeartbeat.run();
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
checkClusterMetricsInfo(capability, 1);
rm.stop();
}
private void explicitFailover(MockRM rm) throws IOException {
rm.getAdminService().transitionToStandby(requestInfo);
Assert.assertTrue(rm.getRMContext()
.getHAServiceState() == HAServiceProtocol.HAServiceState.STANDBY);
rm.getAdminService().transitionToActive(requestInfo);
Assert.assertTrue(rm.getRMContext()
.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE);
lastHearbeatTS = 0;
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
}
private void checkClusterMetricsInfo(String capability, int numNodes)
throws JAXBException {
ClusterMetricsInfo clusterMetricsInfo = unmarshaller.unmarshalFromJSON(
new StringReader(capability), ClusterMetricsInfo.class);
Assert.assertEquals(numNodes, clusterMetricsInfo.getTotalNodes());
}
private String checkSubClusterInfo(SubClusterState state)
throws YarnException, UnknownHostException {
Assert.assertNotNull(stateStore.getSubCluster(request));
SubClusterInfo response =
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.assertEquals(state, response.getState());
Assert.assertTrue(response.getLastHeartBeat() >= lastHearbeatTS);
String expectedAddress =
(response.getClientRMServiceAddress().split(":"))[0];
Assert.assertEquals(expectedAddress,
(response.getAMRMServiceAddress().split(":"))[0]);
Assert.assertEquals(expectedAddress,
(response.getRMAdminServiceAddress().split(":"))[0]);
Assert.assertEquals(expectedAddress,
(response.getRMWebServiceAddress().split(":"))[0]);
lastHearbeatTS = response.getLastHeartBeat();
return response.getCapability();
}
}