blob: 4885f31bc0fd0ee2772eee2e4365d01adff65a94 [file] [log] [blame]
package org.apache.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StringTemplate;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides access to the persistent configuration of the cluster, the instances that live on it,
* and the logical resources assigned to it.
*/
public class ConfigAccessor {
private static Logger LOG = LoggerFactory.getLogger(ConfigAccessor.class);
private static final StringTemplate template = new StringTemplate();
static {
// @formatter:off
template.addEntry(ConfigScopeProperty.CLUSTER, 1, "/{clusterName}/CONFIGS/CLUSTER");
template.addEntry(ConfigScopeProperty.CLUSTER, 2,
"/{clusterName}/CONFIGS/CLUSTER/{clusterName}|SIMPLEKEYS");
template.addEntry(ConfigScopeProperty.PARTICIPANT, 1, "/{clusterName}/CONFIGS/PARTICIPANT");
template.addEntry(ConfigScopeProperty.PARTICIPANT, 2,
"/{clusterName}/CONFIGS/PARTICIPANT/{participantName}|SIMPLEKEYS");
template.addEntry(ConfigScopeProperty.RESOURCE, 1, "/{clusterName}/CONFIGS/RESOURCE");
template.addEntry(ConfigScopeProperty.RESOURCE, 2,
"/{clusterName}/CONFIGS/RESOURCE/{resourceName}|SIMPLEKEYS");
template.addEntry(ConfigScopeProperty.PARTITION, 2,
"/{clusterName}/CONFIGS/RESOURCE/{resourceName}|MAPKEYS");
template.addEntry(ConfigScopeProperty.PARTITION, 3,
"/{clusterName}/CONFIGS/RESOURCE/{resourceName}|MAPMAPKEYS|{partitionName}");
// @formatter:on
}
private final RealmAwareZkClient _zkClient;
// true if ConfigAccessor was instantiated with a HelixZkClient, false otherwise
// This is used for close() to determine how ConfigAccessor should close the underlying ZkClient
private final boolean _usesExternalZkClient;
private ConfigAccessor(RealmAwareZkClient zkClient, boolean usesExternalZkClient) {
_zkClient = zkClient;
_usesExternalZkClient = usesExternalZkClient;
}
/**
* Initialize an accessor with a Zookeeper client
* Note: it is recommended to use the other constructor instead to avoid having to create a
* RealmAwareZkClient.
* @param zkClient
*/
@Deprecated
public ConfigAccessor(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_usesExternalZkClient = true;
}
/**
* Initialize a ConfigAccessor with a ZooKeeper connect string. It will use a SharedZkClient with
* default settings. Note that ZNRecordSerializer will be used for the internal ZkClient since
* ConfigAccessor only deals with Helix's data models like ResourceConfig.
* @param zkAddress
*/
@Deprecated
public ConfigAccessor(String zkAddress) {
_usesExternalZkClient = false;
// If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
try {
_zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig()
.setZkSerializer(new ZNRecordSerializer()));
return;
} catch (InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("Failed to create ConfigAccessor!", e);
}
}
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
}
/**
* get config
* @deprecated replaced by {@link #get(HelixConfigScope, String)}
* @param scope
* @param key
* @return value or null if doesn't exist
*/
@Deprecated
public String get(ConfigScope scope, String key) {
Map<String, String> map = get(scope, Arrays.asList(key));
return map.get(key);
}
/**
* get configs
* @deprecated replaced by {@link #get(HelixConfigScope, List<String>)}
* @param scope
* @param keys
* @return
*/
@Deprecated
public Map<String, String> get(ConfigScope scope, List<String> keys) {
if (scope == null || scope.getScope() == null) {
LOG.error("Scope can't be null");
return null;
}
// String value = null;
Map<String, String> map = new HashMap<String, String>();
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("cluster " + clusterName + " is not setup yet");
}
String scopeStr = scope.getScopeStr();
String[] splits = scopeStr.split("\\|");
ZNRecord record = _zkClient.readData(splits[0], true);
if (record != null) {
if (splits.length == 1) {
for (String key : keys) {
if (record.getSimpleFields().containsKey(key)) {
map.put(key, record.getSimpleField(key));
}
}
} else if (splits.length == 2) {
if (record.getMapField(splits[1]) != null) {
for (String key : keys) {
if (record.getMapField(splits[1]).containsKey(key)) {
map.put(key, record.getMapField(splits[1]).get(key));
}
}
}
}
}
return map;
}
/**
* get a single config entry
* @param scope specification of the entity set to query
* (e.g. cluster, resource, participant, etc.)
* @param key the identifier of the configuration entry
* @return the configuration entry
*/
public String get(HelixConfigScope scope, String key) {
Map<String, String> map = get(scope, Arrays.asList(key));
if (map != null) {
return map.get(key);
}
return null;
}
/**
* get many config entries
* @param scope scope specification of the entity set to query
* (e.g. cluster, resource, participant, etc.)
* @param keys the identifiers of the configuration entries
* @return the configuration entries, organized by key
*/
public Map<String, String> get(HelixConfigScope scope, List<String> keys) {
if (scope == null || scope.getType() == null || !scope.isFullKey()) {
LOG.error("fail to get configs. invalid config scope. scope: {}, keys: {}.", scope, keys);
return null;
}
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No config found at {}.", scope.getZkPath());
return null;
}
Map<String, String> map = new HashMap<String, String>();
String mapKey = scope.getMapKey();
if (mapKey == null) {
for (String key : keys) {
if (record.getSimpleFields().containsKey(key)) {
map.put(key, record.getSimpleField(key));
}
}
} else {
Map<String, String> configMap = record.getMapField(mapKey);
if (configMap == null) {
LOG.warn("No map-field found in {} using mapKey: {}.", record, mapKey);
return null;
}
for (String key : keys) {
if (record.getMapField(mapKey).containsKey(key)) {
map.put(key, record.getMapField(mapKey).get(key));
}
}
}
return map;
}
private ZNRecord getConfigZnRecord(HelixConfigScope scope) {
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to get configs. cluster " + clusterName + " is not setup yet");
}
return _zkClient.readData(scope.getZkPath(), true);
}
/**
* Set config, create if not exist
* @deprecated replaced by {@link #set(HelixConfigScope, String, String)}
* @param scope
* @param key
* @param value
*/
@Deprecated
public void set(ConfigScope scope, String key, String value) {
Map<String, String> map = new HashMap<String, String>();
map.put(key, value);
set(scope, map);
}
/**
* Set configs, create if not exist
* @deprecated replaced by {@link #set(HelixConfigScope, Map<String, String>)}
* @param scope
* @param keyValueMap
*/
@Deprecated
public void set(ConfigScope scope, Map<String, String> keyValueMap) {
if (scope == null || scope.getScope() == null) {
LOG.error("Scope can't be null.");
return;
}
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("cluster: " + clusterName + " is NOT setup.");
}
if (scope.getScope() == ConfigScopeProperty.PARTICIPANT) {
String scopeStr = scope.getScopeStr();
String instanceName = scopeStr.substring(scopeStr.lastIndexOf('/') + 1);
if (!ZKUtil.isInstanceSetup(_zkClient, scope.getClusterName(), instanceName,
InstanceType.PARTICIPANT)) {
throw new HelixException(
"instance: " + instanceName + " is NOT setup in cluster: " + clusterName);
}
}
// use "|" to delimit resource and partition. e.g. /MyCluster/CONFIGS/PARTICIPANT/MyDB|MyDB_0
String scopeStr = scope.getScopeStr();
String[] splits = scopeStr.split("\\|");
String id = splits[0].substring(splits[0].lastIndexOf('/') + 1);
ZNRecord update = new ZNRecord(id);
if (splits.length == 1) {
for (String key : keyValueMap.keySet()) {
String value = keyValueMap.get(key);
update.setSimpleField(key, value);
}
} else if (splits.length == 2) {
if (update.getMapField(splits[1]) == null) {
update.setMapField(splits[1], new TreeMap<String, String>());
}
for (String key : keyValueMap.keySet()) {
String value = keyValueMap.get(key);
update.getMapField(splits[1]).put(key, value);
}
}
ZKUtil.createOrMerge(_zkClient, splits[0], update, true, true);
}
/**
* Set config, creating it if it doesn't exist
* @param scope scope specification of the entity set to query
* (e.g. cluster, resource, participant, etc.)
* @param key the identifier of the configuration entry
* @param value the configuration
*/
public void set(HelixConfigScope scope, String key, String value) {
Map<String, String> map = new TreeMap<String, String>();
map.put(key, value);
set(scope, map);
}
/**
* Set multiple configs, creating them if they don't exist
* @param scope scope specification of the entity set to query
* (e.g. cluster, resource, participant, etc.)
* @param keyValueMap configurations organized by their identifiers
*/
public void set(HelixConfigScope scope, Map<String, String> keyValueMap) {
if (scope == null || scope.getType() == null || !scope.isFullKey()) {
LOG.error("fail to set config. invalid config scope. Scope: {}.", scope);
return;
}
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to set config. cluster: " + clusterName + " is NOT setup.");
}
if (scope.getType() == ConfigScopeProperty.PARTICIPANT) {
if (!ZKUtil.isInstanceSetup(_zkClient, scope.getClusterName(), scope.getParticipantName(),
InstanceType.PARTICIPANT)) {
throw new HelixException("fail to set config. instance: " + scope.getParticipantName()
+ " is NOT setup in cluster: " + clusterName);
}
}
String mapKey = scope.getMapKey();
String zkPath = scope.getZkPath();
String id = zkPath.substring(zkPath.lastIndexOf('/') + 1);
ZNRecord update = new ZNRecord(id);
if (mapKey == null) {
update.getSimpleFields().putAll(keyValueMap);
} else {
update.setMapField(mapKey, keyValueMap);
}
ZKUtil.createOrMerge(_zkClient, zkPath, update, true, true);
}
/**
* Remove config
* @deprecated replaced by {@link #remove(HelixConfigScope, String)}
* @param scope
* @param key
*/
@Deprecated
public void remove(ConfigScope scope, String key) {
remove(scope, Arrays.asList(key));
}
/**
* remove configs
* @deprecated replaced by {@link #remove(HelixConfigScope, List<String>)}
* @param scope
* @param keys
*/
@Deprecated
public void remove(ConfigScope scope, List<String> keys) {
if (scope == null || scope.getScope() == null) {
LOG.error("Scope can't be null.");
return;
}
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("cluster " + clusterName + " is not setup yet");
}
String scopeStr = scope.getScopeStr();
String[] splits = scopeStr.split("\\|");
String id = splits[0].substring(splits[0].lastIndexOf('/') + 1);
ZNRecord update = new ZNRecord(id);
if (splits.length == 1) {
// subtract doesn't care about value, use empty string
for (String key : keys) {
update.setSimpleField(key, "");
}
} else if (splits.length == 2) {
if (update.getMapField(splits[1]) == null) {
update.setMapField(splits[1], new TreeMap<String, String>());
}
// subtract doesn't care about value, use empty string
for (String key : keys) {
update.getMapField(splits[1]).put(key, "");
}
}
ZKUtil.subtract(_zkClient, splits[0], update);
}
/**
* Remove a single config
* @param scope scope specification of the entity set to query
* (e.g. cluster, resource, participant, etc.)
* @param key the identifier of the configuration entry
*/
public void remove(HelixConfigScope scope, String key) {
remove(scope, Arrays.asList(key));
}
/**
* Remove multiple configs
* @param scope scope specification of the entity set to query
* (e.g. cluster, resource, participant, etc.)
* @param keys the identifiers of the configuration entries
*/
public void remove(HelixConfigScope scope, List<String> keys) {
if (scope == null || scope.getType() == null || !scope.isFullKey()) {
LOG.error("fail to remove. invalid scope: {}, keys: {}", scope, keys);
return;
}
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to remove. cluster " + clusterName + " is not setup yet");
}
String zkPath = scope.getZkPath();
String mapKey = scope.getMapKey();
String id = zkPath.substring(zkPath.lastIndexOf('/') + 1);
ZNRecord update = new ZNRecord(id);
if (mapKey == null) {
// subtract doesn't care about value, use empty string
for (String key : keys) {
update.setSimpleField(key, "");
}
} else {
update.setMapField(mapKey, new TreeMap<String, String>());
// subtract doesn't care about value, use empty string
for (String key : keys) {
update.getMapField(mapKey).put(key, "");
}
}
ZKUtil.subtract(_zkClient, zkPath, update);
}
/**
* Remove multiple configs
*
* @param scope scope specification of the entity set to query (e.g. cluster, resource,
* participant, etc.)
* @param recordToRemove the ZNRecord that holds the entries that needs to be removed
*/
public void remove(HelixConfigScope scope, ZNRecord recordToRemove) {
if (scope == null || scope.getType() == null || !scope.isFullKey()) {
LOG.error("fail to remove. invalid scope: {}.", scope);
return;
}
String clusterName = scope.getClusterName();
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to remove. cluster " + clusterName + " is not setup yet");
}
String zkPath = scope.getZkPath();
ZKUtil.subtract(_zkClient, zkPath, recordToRemove);
}
/**
* get config keys
* @deprecated replaced by {@link #getKeys(HelixConfigScope)}
* @param type
* @param clusterName
* @param keys
* @return
*/
@Deprecated
public List<String> getKeys(ConfigScopeProperty type, String clusterName, String... keys) {
if (type == null || clusterName == null) {
LOG.error("ClusterName|scope can't be null.");
return Collections.emptyList();
}
try {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
LOG.error("cluster {} is not setup yet.", clusterName);
return Collections.emptyList();
}
String[] args = new String[1 + keys.length];
args[0] = clusterName;
System.arraycopy(keys, 0, args, 1, keys.length);
String scopeStr = template.instantiate(type, args);
String[] splits = scopeStr.split("\\|");
List<String> retKeys = null;
if (splits.length == 1) {
retKeys = _zkClient.getChildren(splits[0]);
} else {
ZNRecord record = _zkClient.readData(splits[0]);
if (splits[1].startsWith("SIMPLEKEYS")) {
retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
} else if (splits[1].startsWith("MAPKEYS")) {
retKeys = new ArrayList<String>(record.getMapFields().keySet());
} else if (splits[1].startsWith("MAPMAPKEYS")) {
retKeys = new ArrayList<String>(record.getMapField(splits[2]).keySet());
}
}
if (retKeys == null) {
LOG.error("Invalid scope: {} or keys: {}.", type, Arrays.toString(args));
return Collections.emptyList();
}
Collections.sort(retKeys);
return retKeys;
} catch (Exception e) {
return Collections.emptyList();
}
}
/**
* Get list of config keys for a scope
* @param scope
* @return a list of configuration keys
*/
public List<String> getKeys(HelixConfigScope scope) {
if (scope == null || scope.getType() == null) {
LOG.error("Fail to getKeys. Invalid config scope: {}.", scope);
return null;
}
if (!ZKUtil.isClusterSetup(scope.getClusterName(), _zkClient)) {
LOG.error("Fail to getKeys. Cluster {} is not setup yet.", scope.getClusterName());
return Collections.emptyList();
}
String zkPath = scope.getZkPath();
String mapKey = scope.getMapKey();
List<String> retKeys = null;
if (scope.isFullKey()) {
ZNRecord record = _zkClient.readData(zkPath);
if (mapKey == null) {
retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
} else {
retKeys = new ArrayList<String>(record.getMapField(mapKey).keySet());
}
} else {
if (scope.getType() == ConfigScopeProperty.PARTITION) {
ZNRecord record = _zkClient.readData(zkPath);
retKeys = new ArrayList<String>(record.getMapFields().keySet());
} else {
retKeys = _zkClient.getChildren(zkPath);
}
}
if (retKeys != null) {
Collections.sort(retKeys);
}
return retKeys;
}
/**
* Get CustomizedStateConfig of the given cluster.
* @param clusterName
* @return The instance of {@link CustomizedStateConfig}
*/
public CustomizedStateConfig getCustomizedStateConfig(String clusterName) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException(String.format("Failed to get config. cluster: %s is not setup.", clusterName));
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CUSTOMIZED_STATE).forCluster(clusterName).build();
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No customized state aggregation config found at {}.", scope.getZkPath());
return null;
}
return new CustomizedStateConfig.Builder(record).build();
}
/**
* Get ClusterConfig of the given cluster.
*
* @param clusterName
*
* @return
*/
public ClusterConfig getClusterConfig(String clusterName) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to get config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No config found at {}.", scope.getZkPath());
return null;
}
return new ClusterConfig(record);
}
/**
* Get CloudConfig of the given cluster.
* @param clusterName
* @return The instance of {@link CloudConfig}
*/
public CloudConfig getCloudConfig(String clusterName) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException(
String.format("Failed to get config. cluster: %s is not setup.", clusterName));
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLOUD).forCluster(clusterName).build();
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No cloud config found at {}.", scope.getZkPath());
return null;
}
return new CloudConfig(record);
}
/**
* Delete cloud config fields (not the whole config)
* @param clusterName
* @param cloudConfig
*/
public void deleteCloudConfigFields(String clusterName, CloudConfig cloudConfig) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to delete cloud config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLOUD).forCluster(clusterName).build();
remove(scope, cloudConfig.getRecord());
}
/**
* Update cloud config
* @param clusterName
* @param cloudConfig
*/
public void updateCloudConfig(String clusterName, CloudConfig cloudConfig) {
updateCloudConfig(clusterName, cloudConfig, false);
}
private void updateCloudConfig(String clusterName, CloudConfig cloudConfig, boolean overwrite) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("Fail to update cloud config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLOUD).forCluster(clusterName).build();
String zkPath = scope.getZkPath();
if (overwrite) {
ZKUtil.createOrReplace(_zkClient, zkPath, cloudConfig.getRecord(), true);
} else {
ZKUtil.createOrUpdate(_zkClient, zkPath, cloudConfig.getRecord(), true, true);
}
}
/**
* Get RestConfig of the given cluster.
* @param clusterName The cluster
* @return The instance of {@link RESTConfig}
*/
public RESTConfig getRESTConfig(String clusterName) {
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.REST).forCluster(clusterName).build();
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No rest config found at {}.", scope.getZkPath());
return null;
}
return new RESTConfig(record);
}
/**
* Set RestConfig of a given cluster
* @param clusterName the cluster id
* @param restConfig the RestConfig to be set to the cluster
*/
public void setRESTConfig(String clusterName, RESTConfig restConfig) {
updateRESTConfig(clusterName, restConfig, true);
}
/**
* Update RestConfig of a given cluster
* @param clusterName the cluster id
* @param restConfig the new RestConfig to be set to the cluster
*/
public void updateRESTConfig(String clusterName, RESTConfig restConfig) {
updateRESTConfig(clusterName, restConfig, false);
}
private void updateRESTConfig(String clusterName, RESTConfig restConfig, boolean overwrite) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("Fail to update REST config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.REST).forCluster(clusterName).build();
String zkPath = scope.getZkPath();
// Create "/{clusterId}/CONFIGS/REST" if it does not exist
String parentPath = HelixUtil.getZkParentPath(zkPath);
if (!_zkClient.exists(parentPath)) {
ZKUtil.createOrMerge(_zkClient, parentPath, new ZNRecord(parentPath), true, true);
}
if (overwrite) {
ZKUtil.createOrReplace(_zkClient, zkPath, restConfig.getRecord(), true);
} else {
ZKUtil.createOrUpdate(_zkClient, zkPath, restConfig.getRecord(), true, true);
}
}
public void deleteRESTConfig(String clusterName) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("Fail to delete REST config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.REST).forCluster(clusterName).build();
String zkPath = scope.getZkPath();
// Check if "/{clusterId}/CONFIGS/REST" exists
String parentPath = HelixUtil.getZkParentPath(zkPath);
if (!_zkClient.exists(parentPath)) {
throw new HelixException("Fail to delete REST config. cluster: " + clusterName + " does not have a rest config."); }
ZKUtil.dropChildren(_zkClient, parentPath, new ZNRecord(clusterName));
}
/**
* Set ClusterConfig of the given cluster.
* The current Cluster config will be replaced with the given clusterConfig.
* WARNING: This is not thread-safe or concurrent updates safe.
*
* @param clusterName
* @param clusterConfig
*
* @return
*/
public void setClusterConfig(String clusterName, ClusterConfig clusterConfig) {
updateClusterConfig(clusterName, clusterConfig, true);
}
/**
* Update ClusterConfig of the given cluster.
* The value of field in current config will be replaced with the value of the same field in given config if it
* presents. If there is new field in given config but not in current config, the field will be added into
* the current config..
* The list fields and map fields will be replaced as a single entry.
*
* The current Cluster config will be replaced with the given clusterConfig.
* WARNING: This is not thread-safe or concurrent updates safe.
*
* @param clusterName
* @param clusterConfig
*
* @return
*/
public void updateClusterConfig(String clusterName, ClusterConfig clusterConfig) {
updateClusterConfig(clusterName, clusterConfig, false);
}
private void updateClusterConfig(String clusterName, ClusterConfig clusterConfig,
boolean overwrite) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to update config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
String zkPath = scope.getZkPath();
if (overwrite) {
ZKUtil.createOrReplace(_zkClient, zkPath, clusterConfig.getRecord(), true);
} else {
ZKUtil.createOrUpdate(_zkClient, zkPath, clusterConfig.getRecord(), true, true);
}
}
/**
* Get resource config for given resource in given cluster.
*
* @param clusterName
* @param resourceName
*
* @return
*/
public ResourceConfig getResourceConfig(String clusterName, String resourceName) {
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
.forResource(resourceName).build();
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No config found at {}.", scope.getZkPath());
return null;
}
return new ResourceConfig(record);
}
/**
* Set config of the given resource.
* The current Resource config will be replaced with the given clusterConfig.
*
* WARNING: This is not thread-safe or concurrent updates safe.
*
* @param clusterName
* @param resourceName
* @param resourceConfig
*
* @return
*/
public void setResourceConfig(String clusterName, String resourceName,
ResourceConfig resourceConfig) {
updateResourceConfig(clusterName, resourceName, resourceConfig, true);
}
/**
* Update ResourceConfig of the given resource.
* The value of field in current config will be replaced with the value of the same field in given config if it
* presents. If there is new field in given config but not in current config, the field will be added into
* the current config..
* The list fields and map fields will be replaced as a single entry.
*
* The current Cluster config will be replaced with the given clusterConfig.
* WARNING: This is not thread-safe or concurrent updates safe.
*
* @param clusterName
* @param resourceName
* @param resourceConfig
*
* @return
*/
public void updateResourceConfig(String clusterName, String resourceName,
ResourceConfig resourceConfig) {
updateResourceConfig(clusterName, resourceName, resourceConfig, false);
}
private void updateResourceConfig(String clusterName, String resourceName,
ResourceConfig resourceConfig, boolean overwrite) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to setup config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE).forCluster(clusterName)
.forResource(resourceName).build();
String zkPath = scope.getZkPath();
if (overwrite) {
ZKUtil.createOrReplace(_zkClient, zkPath, resourceConfig.getRecord(), true);
} else {
ZKUtil.createOrUpdate(_zkClient, zkPath, resourceConfig.getRecord(), true, true);
}
}
/**
* Get instance config for given resource in given cluster.
*
* @param clusterName
* @param instanceName
*
* @return
*/
public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
throw new HelixException(
"fail to get config. instance: " + instanceName + " is NOT setup in cluster: "
+ clusterName);
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(clusterName)
.forParticipant(instanceName).build();
ZNRecord record = getConfigZnRecord(scope);
if (record == null) {
LOG.warn("No config found at {}.", scope.getZkPath());
return null;
}
return new InstanceConfig(record);
}
/**
* Set config of the given instance config.
* The current instance config will be replaced with the given instanceConfig.
* WARNING: This is not thread-safe or concurrent updates safe.
*
* @param clusterName
* @param instanceName
* @param instanceConfig
*
* @return
*/
public void setInstanceConfig(String clusterName, String instanceName,
InstanceConfig instanceConfig) {
updateInstanceConfig(clusterName, instanceName, instanceConfig, true);
}
/**
* Update InstanceConfig of the given resource. The value of field in current config will be
* replaced with the value of the same field in given config if it presents. If there is new field
* in given config but not in current config, the field will be added into the current config..
* The list fields and map fields will be replaced as a single entry.
* The current instanceConfig will be replaced with the given instanceConfig. WARNING: This is not
* thread-safe or concurrent updates safe.
* *
*
* @param clusterName
* @param instanceName
* @param instanceConfig
*
* @return
*/
public void updateInstanceConfig(String clusterName, String instanceName,
InstanceConfig instanceConfig) {
updateInstanceConfig(clusterName, instanceName, instanceConfig, false);
}
private void updateInstanceConfig(String clusterName, String instanceName,
InstanceConfig instanceConfig, boolean overwrite) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("fail to setup config. cluster: " + clusterName + " is NOT setup.");
}
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(clusterName)
.forParticipant(instanceName).build();
String zkPath = scope.getZkPath();
if (!_zkClient.exists(zkPath)) {
throw new HelixException(
"updateInstanceConfig failed. Given InstanceConfig does not already exist. instance: "
+ instanceName);
}
if (overwrite) {
ZKUtil.createOrReplace(_zkClient, zkPath, instanceConfig.getRecord(), true);
} else {
ZKUtil.createOrUpdate(_zkClient, zkPath, instanceConfig.getRecord(), true, true);
}
}
/**
* Closes ConfigAccessor: closes the stateful resources including the ZkClient.
*/
public void close() {
if (_zkClient != null && !_usesExternalZkClient) {
_zkClient.close();
}
}
@Override
public void finalize() {
close();
}
public static class Builder extends GenericZkHelixApiBuilder<Builder> {
public Builder() {
}
public ConfigAccessor build() {
validate();
return new ConfigAccessor(
createZkClient(_realmMode, _realmAwareZkConnectionConfig, _realmAwareZkClientConfig,
_zkAddress), false);
}
}
}