blob: 7276d3d01433c6256a99748eab21bd25f2c3e8c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* A Zookeeper-based implementation of {@link YarnConfigurationStore}.
*/
public class ZKConfigurationStore extends YarnConfigurationStore {
public static final Logger LOG =
LoggerFactory.getLogger(ZKConfigurationStore.class);
private long maxLogs;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Configuration conf;
private static final String ZK_VERSION_PATH = "VERSION";
private static final String LOGS_PATH = "LOGS";
private static final String CONF_STORE_PATH = "CONF_STORE";
private static final String FENCING_PATH = "FENCING";
private static final String CONF_VERSION_PATH = "CONF_VERSION";
private String zkVersionPath;
private String logsPath;
private String confStorePath;
private String fencingNodePath;
private String confVersionPath;
private ZKCuratorManager zkManager;
private List<ACL> zkAcl;
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws Exception {
this.conf = config;
String znodeParentPath = conf.get(
YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
this.zkManager =
rmContext.getResourceManager().createAndStartZKManager(conf);
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH);
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
zkManager.delete(fencingNodePath);
if (createNewZkPath(logsPath)) {
setZkData(logsPath, new LinkedList<LogMutation>());
}
if (createNewZkPath(confVersionPath)) {
setZkData(confVersionPath, String.valueOf(0));
}
if (createNewZkPath(confStorePath)) {
HashMap<String, String> mapSchedConf = new HashMap<>();
for (Map.Entry<String, String> entry : schedConf) {
mapSchedConf.put(entry.getKey(), entry.getValue());
}
setZkData(confStorePath, mapSchedConf);
long configVersion = getConfigVersion() + 1L;
setZkData(confVersionPath, String.valueOf(configVersion));
}
}
@VisibleForTesting
@Override
protected LinkedList<LogMutation> getLogs() throws Exception {
return unsafeCast(deserializeObject(getZkData(logsPath)));
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@Override
public Version getConfStoreVersion() throws Exception {
if (zkManager.exists(zkVersionPath)) {
byte[] data = getZkData(zkVersionPath);
return new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
return null;
}
@Override
public void format() throws Exception {
zkManager.delete(confStorePath);
}
@Override
public synchronized void storeVersion() throws Exception {
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (zkManager.exists(zkVersionPath)) {
safeSetZkData(zkVersionPath, data);
} else {
safeCreateZkData(zkVersionPath, data);
}
}
@Override
public void logMutation(LogMutation logMutation) throws Exception {
if (maxLogs > 0) {
byte[] storedLogs = getZkData(logsPath);
LinkedList<LogMutation> logs = new LinkedList<>();
if (storedLogs != null) {
logs = unsafeCast(deserializeObject(storedLogs));
}
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.remove(logs.removeFirst());
}
safeSetZkData(logsPath, logs);
}
}
@Override
public void confirmMutation(LogMutation pendingMutation,
boolean isValid) throws Exception {
if (isValid) {
Configuration storedConfigs = retrieve();
Map<String, String> mapConf = new HashMap<>();
for (Map.Entry<String, String> storedConf : storedConfigs) {
mapConf.put(storedConf.getKey(), storedConf.getValue());
}
for (Map.Entry<String, String> confChange :
pendingMutation.getUpdates().entrySet()) {
if (confChange.getValue() == null || confChange.getValue().isEmpty()) {
mapConf.remove(confChange.getKey());
} else {
mapConf.put(confChange.getKey(), confChange.getValue());
}
}
safeSetZkData(confStorePath, mapConf);
long configVersion = getConfigVersion() + 1L;
setZkData(confVersionPath, String.valueOf(configVersion));
}
}
@Override
public synchronized Configuration retrieve() {
byte[] serializedSchedConf;
try {
serializedSchedConf = getZkData(confStorePath);
} catch (Exception e) {
LOG.error("Failed to retrieve configuration from zookeeper store", e);
return null;
}
try {
Map<String, String> map =
unsafeCast(deserializeObject(serializedSchedConf));
Configuration c = new Configuration(false);
for (Map.Entry<String, String> e : map.entrySet()) {
c.set(e.getKey(), e.getValue());
}
return c;
} catch (Exception e) {
LOG.error("Exception while deserializing scheduler configuration " +
"from store", e);
}
return null;
}
@Override
public long getConfigVersion() throws Exception {
String version = zkManager.getStringData(confVersionPath);
if (version == null) {
throw new IllegalStateException("Config version can not be properly " +
"serialized. Check Zookeeper config version path to locate " +
"the error!");
}
return Long.parseLong(version);
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
/**
* Creates a new path in Zookeeper only, if it does not already exist.
*
* @param path Value of the Zookeeper path
* @return <code>true</code>if the creation executed; <code>false</code>
* otherwise.
* @throws Exception
*/
private boolean createNewZkPath(String path) throws Exception {
if (!zkManager.exists(path)) {
zkManager.create(path);
return true;
} else {
return false;
}
}
@VisibleForTesting
protected byte[] getZkData(String path) throws Exception {
return zkManager.getData(path);
}
@VisibleForTesting
protected void setZkData(String path, byte[] data) throws Exception {
zkManager.setData(path, data, -1);
}
private void setZkData(String path, Object data) throws Exception {
setZkData(path, serializeObject(data));
}
private void setZkData(String path, String data) throws Exception {
zkManager.setData(path, data, -1);
}
private void safeSetZkData(String path, byte[] data) throws Exception {
zkManager.safeSetData(path, data, -1, zkAcl, fencingNodePath);
}
private void safeSetZkData(String path, Object data) throws Exception {
safeSetZkData(path, serializeObject(data));
}
@VisibleForTesting
protected void safeCreateZkData(String path, byte[] data) throws Exception {
zkManager.safeCreate(path, data, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
}
private static String getNodePath(String root, String nodeName) {
return ZKCuratorManager.getNodePath(root, nodeName);
}
private static byte[] serializeObject(Object o) throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);) {
oos.writeObject(o);
oos.flush();
baos.flush();
return baos.toByteArray();
}
}
private static Object deserializeObject(byte[] bytes) throws Exception {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);) {
return ois.readObject();
}
}
/**
* Casts an object of type Object to type T. It is essential to emphasize,
* that it is an unsafe operation.
*
* @param o Object to be cast from
* @param <T> Type to cast to
* @return casted object of type T
* @throws ClassCastException
*/
@SuppressWarnings("unchecked")
private static <T> T unsafeCast(Object o) throws ClassCastException {
return (T)o;
}
@Override
public void close() throws IOException {
if (zkManager != null) {
zkManager.close();
}
}
}