blob: 0fa48b4939d137f853a120ec4be105cba1178764 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* A Zookeeper-based implementation of {@link YarnConfigurationStore}.
*/
public class ZKConfigurationStore extends YarnConfigurationStore {
public static final Logger LOG =
LoggerFactory.getLogger(ZKConfigurationStore.class);
private long maxLogs;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Configuration conf;
private String znodeParentPath;
private static final String ZK_VERSION_PATH = "VERSION";
private static final String LOGS_PATH = "LOGS";
private static final String CONF_STORE_PATH = "CONF_STORE";
private static final String FENCING_PATH = "FENCING";
private static final String CONF_VERSION_PATH = "CONF_VERSION";
private String zkVersionPath;
private String logsPath;
private String confStorePath;
private String fencingNodePath;
private String confVersionPath;
@VisibleForTesting
protected ZKCuratorManager zkManager;
private List<ACL> zkAcl;
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws Exception {
this.conf = config;
this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
this.znodeParentPath =
conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
this.zkManager =
rmContext.getResourceManager().createAndStartZKManager(conf);
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH);
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
zkManager.delete(fencingNodePath);
if (!zkManager.exists(logsPath)) {
zkManager.create(logsPath);
zkManager.setData(logsPath,
serializeObject(new LinkedList<LogMutation>()), -1);
}
if (!zkManager.exists(confVersionPath)) {
zkManager.create(confVersionPath);
zkManager.setData(confVersionPath, String.valueOf(0), -1);
}
if (!zkManager.exists(confStorePath)) {
zkManager.create(confStorePath);
HashMap<String, String> mapSchedConf = new HashMap<>();
for (Map.Entry<String, String> entry : schedConf) {
mapSchedConf.put(entry.getKey(), entry.getValue());
}
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
long configVersion = getConfigVersion() + 1L;
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
}
}
@VisibleForTesting
protected LinkedList<LogMutation> getLogs() throws Exception {
return (LinkedList<LogMutation>)
deserializeObject(zkManager.getData(logsPath));
}
// TODO: following version-related code is taken from ZKRMStateStore
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@Override
public Version getConfStoreVersion() throws Exception {
if (zkManager.exists(zkVersionPath)) {
byte[] data = zkManager.getData(zkVersionPath);
return new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
return null;
}
@Override
public void format() throws Exception {
zkManager.delete(confStorePath);
}
@Override
public synchronized void storeVersion() throws Exception {
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (zkManager.exists(zkVersionPath)) {
zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath);
} else {
zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
}
}
@Override
public void logMutation(LogMutation logMutation) throws Exception {
if (maxLogs > 0) {
byte[] storedLogs = zkManager.getData(logsPath);
LinkedList<LogMutation> logs = new LinkedList<>();
if (storedLogs != null) {
logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
}
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.remove(logs.removeFirst());
}
zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
fencingNodePath);
}
}
@Override
public void confirmMutation(LogMutation pendingMutation,
boolean isValid) throws Exception {
if (isValid) {
Configuration storedConfigs = retrieve();
Map<String, String> mapConf = new HashMap<>();
for (Map.Entry<String, String> storedConf : storedConfigs) {
mapConf.put(storedConf.getKey(), storedConf.getValue());
}
for (Map.Entry<String, String> confChange :
pendingMutation.getUpdates().entrySet()) {
if (confChange.getValue() == null || confChange.getValue().isEmpty()) {
mapConf.remove(confChange.getKey());
} else {
mapConf.put(confChange.getKey(), confChange.getValue());
}
}
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
zkAcl, fencingNodePath);
long configVersion = getConfigVersion() + 1L;
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
}
}
@Override
public synchronized Configuration retrieve() {
byte[] serializedSchedConf;
try {
serializedSchedConf = zkManager.getData(confStorePath);
} catch (Exception e) {
LOG.error("Failed to retrieve configuration from zookeeper store", e);
return null;
}
try {
Map<String, String> map =
(HashMap<String, String>) deserializeObject(serializedSchedConf);
Configuration c = new Configuration(false);
for (Map.Entry<String, String> e : map.entrySet()) {
c.set(e.getKey(), e.getValue());
}
return c;
} catch (Exception e) {
LOG.error("Exception while deserializing scheduler configuration " +
"from store", e);
}
return null;
}
@Override
public long getConfigVersion() throws Exception {
return Long.parseLong(zkManager.getStringData(confVersionPath));
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
private static String getNodePath(String root, String nodeName) {
return ZKCuratorManager.getNodePath(root, nodeName);
}
private static byte[] serializeObject(Object o) throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);) {
oos.writeObject(o);
oos.flush();
baos.flush();
return baos.toByteArray();
}
}
private static Object deserializeObject(byte[] bytes) throws Exception {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);) {
return ois.readObject();
}
}
}