blob: 18dfdc27d86fae8b8f4860750ea14dd4826fcc25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
/**
* ZooKeeper implementation of {@link FederationStateStore}.
*
* The znode structure is as follows:
* ROOT_DIR_PATH
* |--- MEMBERSHIP
* | |----- SC1
* | |----- SC2
* |--- APPLICATION
* | |----- APP1
* | |----- APP2
* |--- POLICY
* | |----- QUEUE1
* | |----- QUEUE1
* |--- RESERVATION
* | |----- RESERVATION1
* | |----- RESERVATION2
*/
public class ZookeeperFederationStateStore implements FederationStateStore {
private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
private final static String ROOT_ZNODE_NAME_POLICY = "policies";
private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";
/** Interface to Zookeeper. */
private ZKCuratorManager zkManager;
/** Directory to store the state store data. */
private String baseZNode;
private String appsZNode;
private String membershipZNode;
private String policiesZNode;
private String reservationsZNode;
private volatile Clock clock = SystemClock.getInstance();
@VisibleForTesting
private ZKFederationStateStoreOpDurations opDurations =
ZKFederationStateStoreOpDurations.getInstance();
@Override
public void init(Configuration conf) throws YarnException {
LOG.info("Initializing ZooKeeper connection");
baseZNode = conf.get(
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
try {
this.zkManager = new ZKCuratorManager(conf);
this.zkManager.start();
} catch (IOException e) {
LOG.error("Cannot initialize the ZK connection", e);
}
// Base znodes
membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
// Create base znode for each entity
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
@Override
public void close() throws Exception {
if (zkManager != null) {
zkManager.close();
}
}
@Override
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
ApplicationId appId = app.getApplicationId();
// Try to write the subcluster
SubClusterId homeSubCluster = app.getHomeSubCluster();
try {
putApp(appId, homeSubCluster, false);
} catch (Exception e) {
String errMsg = "Cannot add application home subcluster for " + appId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check for the actual subcluster
try {
homeSubCluster = getApp(appId);
} catch (Exception e) {
String errMsg = "Cannot check app home subcluster for " + appId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addAppHomeSubClusterDuration(start, end);
return AddApplicationHomeSubClusterResponse
.newInstance(homeSubCluster);
}
@Override
public UpdateApplicationHomeSubClusterResponse
updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request)
throws YarnException {
long start = clock.getTime();
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
ApplicationId appId = app.getApplicationId();
SubClusterId homeSubCluster = getApp(appId);
if (homeSubCluster == null) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
SubClusterId newSubClusterId =
request.getApplicationHomeSubCluster().getHomeSubCluster();
putApp(appId, newSubClusterId, true);
long end = clock.getTime();
opDurations.addUpdateAppHomeSubClusterDuration(start, end);
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
@Override
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
SubClusterId homeSubCluster = getApp(appId);
if (homeSubCluster == null) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetAppHomeSubClusterDuration(start, end);
return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster);
}
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
List<ApplicationHomeSubCluster> result = new ArrayList<>();
try {
for (String child : zkManager.getChildren(appsZNode)) {
ApplicationId appId = ApplicationId.fromString(child);
SubClusterId homeSubCluster = getApp(appId);
ApplicationHomeSubCluster app =
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
result.add(app);
}
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetAppsHomeSubClusterDuration(start, end);
return GetApplicationsHomeSubClusterResponse.newInstance(result);
}
@Override
public DeleteApplicationHomeSubClusterResponse
deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request)
throws YarnException {
long start = clock.getTime();
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
String appZNode = getNodePath(appsZNode, appId.toString());
boolean exists = false;
try {
exists = zkManager.exists(appZNode);
} catch (Exception e) {
String errMsg = "Cannot check app: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
if (!exists) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
try {
zkManager.delete(appZNode);
} catch (Exception e) {
String errMsg = "Cannot delete app: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addDeleteAppHomeSubClusterDuration(start, end);
return DeleteApplicationHomeSubClusterResponse.newInstance();
}
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
long start = clock.getTime();
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
SubClusterId subclusterId = subClusterInfo.getSubClusterId();
// Update the heartbeat time
long currentTime = getCurrentTime();
subClusterInfo.setLastHeartBeat(currentTime);
try {
putSubclusterInfo(subclusterId, subClusterInfo, true);
} catch (Exception e) {
String errMsg = "Cannot register subcluster: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addRegisterSubClusterDuration(start, end);
return SubClusterRegisterResponse.newInstance();
}
@Override
public SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest request) throws YarnException {
long start = clock.getTime();
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
SubClusterState state = request.getState();
// Get the current information and update it
SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
if (subClusterInfo == null) {
String errMsg = "SubCluster " + subClusterId + " not found";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
} else {
subClusterInfo.setState(state);
putSubclusterInfo(subClusterId, subClusterInfo, true);
}
long end = clock.getTime();
opDurations.addDeregisterSubClusterDuration(start, end);
return SubClusterDeregisterResponse.newInstance();
}
@Override
public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest request) throws YarnException {
long start = clock.getTime();
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
if (subClusterInfo == null) {
String errMsg = "SubCluster " + subClusterId
+ " does not exist; cannot heartbeat";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long currentTime = getCurrentTime();
subClusterInfo.setLastHeartBeat(currentTime);
subClusterInfo.setState(request.getState());
subClusterInfo.setCapability(request.getCapability());
putSubclusterInfo(subClusterId, subClusterInfo, true);
long end = clock.getTime();
opDurations.addSubClusterHeartbeatDuration(start, end);
return SubClusterHeartbeatResponse.newInstance();
}
@Override
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest request) throws YarnException {
long start = clock.getTime();
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
SubClusterInfo subClusterInfo = null;
try {
subClusterInfo = getSubclusterInfo(subClusterId);
if (subClusterInfo == null) {
LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
return null;
}
} catch (Exception e) {
String errMsg = "Cannot get subcluster: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetSubClusterDuration(start, end);
return GetSubClusterInfoResponse.newInstance(subClusterInfo);
}
@Override
public GetSubClustersInfoResponse getSubClusters(
GetSubClustersInfoRequest request) throws YarnException {
long start = clock.getTime();
List<SubClusterInfo> result = new ArrayList<>();
try {
for (String child : zkManager.getChildren(membershipZNode)) {
SubClusterId subClusterId = SubClusterId.newInstance(child);
SubClusterInfo info = getSubclusterInfo(subClusterId);
if (!request.getFilterInactiveSubClusters() ||
info.getState().isActive()) {
result.add(info);
}
}
} catch (Exception e) {
String errMsg = "Cannot get subclusters: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetSubClustersDuration(start, end);
return GetSubClustersInfoResponse.newInstance(result);
}
@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
long start = clock.getTime();
FederationPolicyStoreInputValidator.validate(request);
String queue = request.getQueue();
SubClusterPolicyConfiguration policy = null;
try {
policy = getPolicy(queue);
} catch (Exception e) {
String errMsg = "Cannot get policy: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
if (policy == null) {
LOG.warn("Policy for queue: {} does not exist.", queue);
return null;
}
long end = clock.getTime();
opDurations.addGetPolicyConfigurationDuration(start, end);
return GetSubClusterPolicyConfigurationResponse
.newInstance(policy);
}
@Override
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
long start = clock.getTime();
FederationPolicyStoreInputValidator.validate(request);
SubClusterPolicyConfiguration policy =
request.getPolicyConfiguration();
try {
String queue = policy.getQueue();
putPolicy(queue, policy, true);
} catch (Exception e) {
String errMsg = "Cannot set policy: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addSetPolicyConfigurationDuration(start, end);
return SetSubClusterPolicyConfigurationResponse.newInstance();
}
@Override
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
long start = clock.getTime();
List<SubClusterPolicyConfiguration> result = new ArrayList<>();
try {
for (String child : zkManager.getChildren(policiesZNode)) {
SubClusterPolicyConfiguration policy = getPolicy(child);
if (policy == null) {
LOG.warn("Policy for queue: {} does not exist.", child);
continue;
}
result.add(policy);
}
} catch (Exception e) {
String errMsg = "Cannot get policies: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetPoliciesConfigurationsDuration(start, end);
return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
}
@Override
public Version getCurrentVersion() {
return null;
}
@Override
public Version loadVersion() {
return null;
}
/**
* Get the subcluster for an application.
* @param appId Application identifier.
* @return Subcluster identifier.
* @throws Exception If it cannot contact ZooKeeper.
*/
private SubClusterId getApp(final ApplicationId appId) throws YarnException {
String appZNode = getNodePath(appsZNode, appId.toString());
SubClusterId subClusterId = null;
byte[] data = get(appZNode);
if (data != null) {
try {
subClusterId = new SubClusterIdPBImpl(
SubClusterIdProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse application at " + appZNode;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
return subClusterId;
}
/**
* Put an application.
* @param appId Application identifier.
* @param subClusterId Subcluster identifier.
* @throws Exception If it cannot contact ZooKeeper.
*/
private void putApp(final ApplicationId appId,
final SubClusterId subClusterId, boolean update)
throws YarnException {
String appZNode = getNodePath(appsZNode, appId.toString());
SubClusterIdProto proto =
((SubClusterIdPBImpl)subClusterId).getProto();
byte[] data = proto.toByteArray();
put(appZNode, data, update);
}
/**
* Get the current information for a subcluster from Zookeeper.
* @param subclusterId Subcluster identifier.
* @return Subcluster information or null if it doesn't exist.
* @throws Exception If it cannot contact ZooKeeper.
*/
private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
throws YarnException {
String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
SubClusterInfo policy = null;
byte[] data = get(memberZNode);
if (data != null) {
try {
policy = new SubClusterInfoPBImpl(
SubClusterInfoProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse subcluster info at " + memberZNode;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
return policy;
}
/**
* Put the subcluster information in Zookeeper.
* @param subclusterId Subcluster identifier.
* @param subClusterInfo Subcluster information.
* @throws Exception If it cannot contact ZooKeeper.
*/
private void putSubclusterInfo(final SubClusterId subclusterId,
final SubClusterInfo subClusterInfo, final boolean update)
throws YarnException {
String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
SubClusterInfoProto proto =
((SubClusterInfoPBImpl)subClusterInfo).getProto();
byte[] data = proto.toByteArray();
put(memberZNode, data, update);
}
/**
* Get the queue policy from Zookeeper.
* @param queue Name of the queue.
* @return Subcluster policy configuration.
* @throws YarnException If it cannot contact ZooKeeper.
*/
private SubClusterPolicyConfiguration getPolicy(final String queue)
throws YarnException {
String policyZNode = getNodePath(policiesZNode, queue);
SubClusterPolicyConfiguration policy = null;
byte[] data = get(policyZNode);
if (data != null) {
try {
policy = new SubClusterPolicyConfigurationPBImpl(
SubClusterPolicyConfigurationProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse policy at " + policyZNode;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
return policy;
}
/**
* Put the subcluster information in Zookeeper.
* @param queue Name of the queue.
* @param policy Subcluster policy configuration.
* @throws YarnException If it cannot contact ZooKeeper.
*/
private void putPolicy(final String queue,
final SubClusterPolicyConfiguration policy, boolean update)
throws YarnException {
String policyZNode = getNodePath(policiesZNode, queue);
SubClusterPolicyConfigurationProto proto =
((SubClusterPolicyConfigurationPBImpl)policy).getProto();
byte[] data = proto.toByteArray();
put(policyZNode, data, update);
}
/**
* Get data from a znode in Zookeeper.
* @param znode Path of the znode.
* @return Data in the znode.
* @throws YarnException If it cannot contact ZooKeeper.
*/
private byte[] get(String znode) throws YarnException {
boolean exists = false;
try {
exists = zkManager.exists(znode);
} catch (Exception e) {
String errMsg = "Cannot find znode " + znode;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
if (!exists) {
LOG.error("{} does not exist", znode);
return null;
}
byte[] data = null;
try {
data = zkManager.getData(znode);
} catch (Exception e) {
String errMsg = "Cannot get data from znode " + znode
+ ": " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
return data;
}
/**
* Put data into a znode in Zookeeper.
* @param znode Path of the znode.
* @param data Data to write.
* @throws YarnException If it cannot contact ZooKeeper.
*/
private void put(String znode, byte[] data, boolean update)
throws YarnException {
// Create the znode
boolean created = false;
try {
created = zkManager.create(znode);
} catch (Exception e) {
String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
if (!created) {
LOG.debug("{} not created", znode);
if (!update) {
LOG.info("{} already existed and we are not updating", znode);
return;
}
}
// Write the data into the znode
try {
zkManager.setData(znode, data, -1);
} catch (Exception e) {
String errMsg = "Cannot write data into znode " + znode
+ ": " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
/**
* Get the current time.
* @return Current time in milliseconds.
*/
private static long getCurrentTime() {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
return cal.getTimeInMillis();
}
private void putReservation(final ReservationId reservationId,
final SubClusterId subClusterId, boolean update) throws YarnException {
String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
byte[] data = proto.toByteArray();
put(reservationZNode, data, update);
}
private SubClusterId getReservation(final ReservationId reservationId)
throws YarnException {
String reservationIdZNode = getNodePath(reservationsZNode, reservationId.toString());
SubClusterId subClusterId = null;
byte[] data = get(reservationIdZNode);
if (data != null) {
try {
subClusterId = new SubClusterIdPBImpl(SubClusterIdProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse reservation at " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
return subClusterId;
}
@VisibleForTesting
public ZKFederationStateStoreOpDurations getOpDurations() {
return opDurations;
}
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
// Try to write the subcluster
SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
try {
putReservation(reservationId, homeSubCluster, false);
} catch (Exception e) {
String errMsg = "Cannot add reservation home subcluster for " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
// Check for the actual subcluster
try {
homeSubCluster = getReservation(reservationId);
} catch (Exception e) {
String errMsg = "Cannot check app home subcluster for " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addReservationHomeSubClusterDuration(start, end);
return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
}
@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
SubClusterId homeSubCluster = getReservation(reservationId);
if (homeSubCluster == null) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
long end = clock.getTime();
opDurations.addGetReservationHomeSubClusterDuration(start, end);
return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
}
@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
List<ReservationHomeSubCluster> result = new ArrayList<>();
try {
for (String child : zkManager.getChildren(reservationsZNode)) {
ReservationId reservationId = ReservationId.parseReservationId(child);
SubClusterId homeSubCluster = getReservation(reservationId);
ReservationHomeSubCluster app =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
result.add(app);
}
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetReservationsHomeSubClusterDuration(start, end);
return GetReservationsHomeSubClusterResponse.newInstance(result);
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
boolean exists = false;
try {
exists = zkManager.exists(reservationZNode);
} catch (Exception e) {
String errMsg = "Cannot check reservation: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
if (!exists) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
try {
zkManager.delete(reservationZNode);
} catch (Exception e) {
String errMsg = "Cannot delete reservation: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
return DeleteReservationHomeSubClusterResponse.newInstance();
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId homeSubCluster = getReservation(reservationId);
if (homeSubCluster == null) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
putReservation(reservationId, newSubClusterId, true);
long end = clock.getTime();
opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
return UpdateReservationHomeSubClusterResponse.newInstance();
}
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}
}