blob: 0a160b94dec0e7781031753f6dafe3f651150230 [file] [log] [blame]
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.fluo.api.config;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import io.fluo.api.client.FluoClient;
import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.SubsetConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Configuration helper class for Fluo. FluoConfiguration extends {@link CompositeConfiguration}.
*/
public class FluoConfiguration extends CompositeConfiguration {
private static final Logger log = LoggerFactory.getLogger(FluoConfiguration.class);
public static final String FLUO_PREFIX = "io.fluo";
// Client properties
private static final String CLIENT_PREFIX = FLUO_PREFIX + ".client";
public static final String CLIENT_APPLICATION_NAME_PROP = CLIENT_PREFIX + ".application.name";
public static final String CLIENT_ACCUMULO_PASSWORD_PROP = CLIENT_PREFIX + ".accumulo.password";
public static final String CLIENT_ACCUMULO_USER_PROP = CLIENT_PREFIX + ".accumulo.user";
public static final String CLIENT_ACCUMULO_INSTANCE_PROP = CLIENT_PREFIX + ".accumulo.instance";
public static final String CLIENT_ACCUMULO_ZOOKEEPERS_PROP = CLIENT_PREFIX
+ ".accumulo.zookeepers";
public static final String CLIENT_ZOOKEEPER_TIMEOUT_PROP = CLIENT_PREFIX + ".zookeeper.timeout";
public static final String CLIENT_ZOOKEEPER_CONNECT_PROP = CLIENT_PREFIX + ".zookeeper.connect";
public static final String CLIENT_RETRY_TIMEOUT_MS_PROP = CLIENT_PREFIX + ".retry.timeout.ms";
public static final String CLIENT_CLASS_PROP = CLIENT_PREFIX + ".class";
public static final int CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT = 30000;
public static final String CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT = "localhost";
public static final String CLIENT_ZOOKEEPER_CONNECT_DEFAULT = "localhost/fluo";
public static final int CLIENT_RETRY_TIMEOUT_MS_DEFAULT = -1;
public static final String CLIENT_CLASS_DEFAULT = FLUO_PREFIX + ".core.client.FluoClientImpl";
// Administration
private static final String ADMIN_PREFIX = FLUO_PREFIX + ".admin";
public static final String ADMIN_ACCUMULO_TABLE_PROP = ADMIN_PREFIX + ".accumulo.table";
public static final String ADMIN_ACCUMULO_CLASSPATH_PROP = ADMIN_PREFIX + ".accumulo.classpath";
public static final String ADMIN_ACCUMULO_CLASSPATH_DEFAULT = "";
public static final String ADMIN_CLASS_PROP = ADMIN_PREFIX + ".class";
public static final String ADMIN_CLASS_DEFAULT = FLUO_PREFIX + ".core.client.FluoAdminImpl";
// Worker
private static final String WORKER_PREFIX = FLUO_PREFIX + ".worker";
public static final String WORKER_NUM_THREADS_PROP = WORKER_PREFIX + ".num.threads";
public static final String WORKER_INSTANCES_PROP = WORKER_PREFIX + ".instances";
public static final String WORKER_MAX_MEMORY_MB_PROP = WORKER_PREFIX + ".max.memory.mb";
public static final String WORKER_NUM_CORES_PROP = WORKER_PREFIX + ".num.cores";
public static final int WORKER_NUM_THREADS_DEFAULT = 10;
public static final int WORKER_INSTANCES_DEFAULT = 1;
public static final int WORKER_MAX_MEMORY_MB_DEFAULT = 1024;
public static final int WORKER_NUM_CORES_DEFAULT = 1;
// Loader
private static final String LOADER_PREFIX = FLUO_PREFIX + ".loader";
public static final String LOADER_NUM_THREADS_PROP = LOADER_PREFIX + ".num.threads";
public static final String LOADER_QUEUE_SIZE_PROP = LOADER_PREFIX + ".queue.size";
public static final int LOADER_NUM_THREADS_DEFAULT = 10;
public static final int LOADER_QUEUE_SIZE_DEFAULT = 10;
// Oracle
private static final String ORACLE_PREFIX = FLUO_PREFIX + ".oracle";
public static final String ORACLE_PORT_PROP = ORACLE_PREFIX + ".port";
public static final String ORACLE_INSTANCES_PROP = ORACLE_PREFIX + ".instances";
public static final String ORACLE_MAX_MEMORY_MB_PROP = ORACLE_PREFIX + ".max.memory.mb";
public static final String ORACLE_NUM_CORES_PROP = ORACLE_PREFIX + ".num.cores";
public static final int ORACLE_PORT_DEFAULT = 9913;
public static final int ORACLE_INSTANCES_DEFAULT = 1;
public static final int ORACLE_MAX_MEMORY_MB_DEFAULT = 512;
public static final int ORACLE_NUM_CORES_DEFAULT = 1;
// MiniFluo
private static final String MINI_PREFIX = FLUO_PREFIX + ".mini";
public static final String MINI_CLASS_PROP = MINI_PREFIX + ".class";
public static final String MINI_START_ACCUMULO_PROP = MINI_PREFIX + ".start.accumulo";
public static final String MINI_DATA_DIR_PROP = MINI_PREFIX + ".data.dir";
public static final String MINI_CLASS_DEFAULT = FLUO_PREFIX + ".mini.MiniFluoImpl";
public static final boolean MINI_START_ACCUMULO_DEFAULT = true;
public static final String MINI_DATA_DIR_DEFAULT = "${env:FLUO_HOME}/mini";
/** The properties below get loaded into/from Zookeeper */
// Observer
public static final String OBSERVER_PREFIX = FLUO_PREFIX + ".observer.";
// Transaction
public static final String TRANSACTION_PREFIX = FLUO_PREFIX + ".tx";
public static final String TRANSACTION_ROLLBACK_TIME_PROP = TRANSACTION_PREFIX + ".rollback.time";
public static final long TRANSACTION_ROLLBACK_TIME_DEFAULT = 300000;
// Metrics
public static final String METRICS_YAML_BASE64 = FLUO_PREFIX + ".metrics.yaml.base64";
public static final String METRICS_YAML_BASE64_DEFAULT = DatatypeConverter.printBase64Binary(
"---\nfrequency: 60 seconds\n".getBytes(Charsets.UTF_8)).replace("\n", "");
// application config
public static final String APP_PREFIX = FLUO_PREFIX + ".app";
public FluoConfiguration() {
super();
setThrowExceptionOnMissing(true);
setDelimiterParsingDisabled(true);
}
public FluoConfiguration(FluoConfiguration other) {
this();
Iterator<String> iter = other.getKeys();
while (iter.hasNext()) {
String key = iter.next();
setProperty(key, other.getProperty(key));
}
}
public FluoConfiguration(Configuration configuration) {
this();
if (configuration instanceof AbstractConfiguration) {
AbstractConfiguration aconf = (AbstractConfiguration) configuration;
aconf.setDelimiterParsingDisabled(true);
}
addConfiguration(configuration);
}
public FluoConfiguration(File propertiesFile) {
this();
try {
PropertiesConfiguration config = new PropertiesConfiguration();
// disabled to prevent accumulo classpath value from being shortened
config.setDelimiterParsingDisabled(true);
config.load(propertiesFile);
addConfiguration(config);
} catch (ConfigurationException e) {
throw new IllegalArgumentException(e);
}
}
public void validate() {
// keep in alphabetical order
getAccumuloClasspath();
getAccumuloInstance();
getAccumuloPassword();
getAccumuloTable();
getAccumuloUser();
getAccumuloZookeepers();
getAdminClass();
getApplicationName();
getAppZookeepers();
getClientClass();
getClientRetryTimeout();
getLoaderQueueSize();
getLoaderThreads();
getMetricsYaml();
getMetricsYamlBase64();
getObserverConfig();
getOracleInstances();
getOracleMaxMemory();
getOracleNumCores();
getOraclePort();
getTransactionRollbackTime();
getWorkerInstances();
getWorkerMaxMemory();
getWorkerNumCores();
getWorkerThreads();
getZookeeperTimeout();
}
public FluoConfiguration setApplicationName(String applicationName) {
verifyApplicationName(applicationName);
setProperty(CLIENT_APPLICATION_NAME_PROP, applicationName);
return this;
}
public String getApplicationName() {
String applicationName = getString(CLIENT_APPLICATION_NAME_PROP);
verifyApplicationName(applicationName);
return applicationName;
}
/**
* Verifies application name. Avoids characters that Zookeeper does not like in nodes.
*
* @param name Application name
*/
private void verifyApplicationName(String name) {
if (name == null) {
throw new IllegalArgumentException("Application name cannot be null");
}
if (name.length() == 0) {
throw new IllegalArgumentException("Application name length must be > 0");
}
String reason = null;
char[] chars = name.toCharArray();
char c;
for (int i = 0; i < chars.length; i++) {
c = chars[i];
if (c == 0) {
reason = "null character not allowed @" + i;
break;
} else if (c == '/' || c == '.') {
reason = "invalid character @" + i;
break;
} else if (c > '\u0000' && c <= '\u001f' || c >= '\u007f' && c <= '\u009F' || c >= '\ud800'
&& c <= '\uf8ff' || c >= '\ufff0' && c <= '\uffff') {
reason = "invalid charater @" + i;
break;
}
}
if (reason != null) {
throw new IllegalArgumentException("Invalid application name \"" + name + "\" caused by "
+ reason);
}
}
public FluoConfiguration setInstanceZookeepers(String zookeepers) {
return setNonEmptyString(CLIENT_ZOOKEEPER_CONNECT_PROP, zookeepers);
}
public String getInstanceZookeepers() {
return getNonEmptyString(CLIENT_ZOOKEEPER_CONNECT_PROP, CLIENT_ZOOKEEPER_CONNECT_DEFAULT);
}
public String getAppZookeepers() {
return getInstanceZookeepers() + "/" + getApplicationName();
}
public FluoConfiguration setZookeeperTimeout(int timeout) {
return setPositiveInt(CLIENT_ZOOKEEPER_TIMEOUT_PROP, timeout);
}
public int getZookeeperTimeout() {
return getPositiveInt(CLIENT_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT);
}
public FluoConfiguration setClientRetryTimeout(int timeoutMS) {
Preconditions.checkArgument(timeoutMS >= -1, CLIENT_RETRY_TIMEOUT_MS_PROP + " must be >= -1");
setProperty(CLIENT_RETRY_TIMEOUT_MS_PROP, timeoutMS);
return this;
}
public int getClientRetryTimeout() {
int retval = getInt(CLIENT_RETRY_TIMEOUT_MS_PROP, CLIENT_RETRY_TIMEOUT_MS_DEFAULT);
Preconditions.checkArgument(retval >= -1, CLIENT_RETRY_TIMEOUT_MS_PROP + " must be >= -1");
return retval;
}
public FluoConfiguration setAccumuloInstance(String accumuloInstance) {
return setNonEmptyString(CLIENT_ACCUMULO_INSTANCE_PROP, accumuloInstance);
}
public String getAccumuloInstance() {
return getNonEmptyString(CLIENT_ACCUMULO_INSTANCE_PROP);
}
public FluoConfiguration setAccumuloUser(String accumuloUser) {
return setNonEmptyString(CLIENT_ACCUMULO_USER_PROP, accumuloUser);
}
public String getAccumuloUser() {
return getNonEmptyString(CLIENT_ACCUMULO_USER_PROP);
}
public FluoConfiguration setAccumuloPassword(String accumuloPassword) {
setProperty(CLIENT_ACCUMULO_PASSWORD_PROP,
verifyNotNull(CLIENT_ACCUMULO_PASSWORD_PROP, accumuloPassword));
return this;
}
public String getAccumuloPassword() {
return verifyNotNull(CLIENT_ACCUMULO_PASSWORD_PROP, getString(CLIENT_ACCUMULO_PASSWORD_PROP));
}
public FluoConfiguration setAccumuloZookeepers(String zookeepers) {
return setNonEmptyString(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, zookeepers);
}
public String getAccumuloZookeepers() {
return getNonEmptyString(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT);
}
public FluoConfiguration setClientClass(String clientClass) {
return setNonEmptyString(CLIENT_CLASS_PROP, clientClass);
}
public String getClientClass() {
return getNonEmptyString(CLIENT_CLASS_PROP, CLIENT_CLASS_DEFAULT);
}
/**
* Sets Accumulo table. This property only needs to be set for FluoAdmin as it will be stored in
* retrieved from Zookeeper for clients.
*/
public FluoConfiguration setAccumuloTable(String table) {
return setNonEmptyString(ADMIN_ACCUMULO_TABLE_PROP, table);
}
public String getAccumuloTable() {
return getNonEmptyString(ADMIN_ACCUMULO_TABLE_PROP);
}
public FluoConfiguration setAccumuloClasspath(String path) {
setProperty(ADMIN_ACCUMULO_CLASSPATH_PROP, verifyNotNull(ADMIN_ACCUMULO_CLASSPATH_PROP, path));
return this;
}
public String getAccumuloClasspath() {
return getString(ADMIN_ACCUMULO_CLASSPATH_PROP, ADMIN_ACCUMULO_CLASSPATH_DEFAULT);
}
public FluoConfiguration setAdminClass(String adminClass) {
return setNonEmptyString(ADMIN_CLASS_PROP, adminClass);
}
public String getAdminClass() {
return getNonEmptyString(ADMIN_CLASS_PROP, ADMIN_CLASS_DEFAULT);
}
public FluoConfiguration setWorkerThreads(int numThreads) {
return setPositiveInt(WORKER_NUM_THREADS_PROP, numThreads);
}
public int getWorkerThreads() {
return getPositiveInt(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
}
public List<ObserverConfiguration> getObserverConfig() {
List<ObserverConfiguration> configList = new ArrayList<>();
Iterator<String> iter = getKeys();
while (iter.hasNext()) {
String key = iter.next();
if (key.startsWith(FluoConfiguration.OBSERVER_PREFIX)) {
String value = getString(key).trim();
if (value.isEmpty()) {
throw new IllegalArgumentException(key + " is set to empty value");
}
String[] fields = value.split(",");
if (fields.length == 0) {
throw new IllegalArgumentException(key + " has bad value: " + value);
}
String className = fields[0];
if (className.isEmpty()) {
throw new IllegalArgumentException(key + " has empty class name: " + className);
}
ObserverConfiguration observerConfig = new ObserverConfiguration(className);
Map<String, String> params = new HashMap<>();
for (int i = 1; i < fields.length; i++) {
String[] kv = fields[i].split("=");
if (kv.length != 2) {
throw new IllegalArgumentException(key
+ " has invalid param. Expected 'key=value' but encountered '" + fields[i] + "'");
}
if (kv[0].isEmpty() || kv[1].isEmpty()) {
throw new IllegalArgumentException(key + " has empty key or value in param: "
+ fields[i]);
}
params.put(kv[0], kv[1]);
}
observerConfig.setParameters(params);
configList.add(observerConfig);
}
}
return configList;
}
/**
* Sets the {@link ObserverConfiguration} for observers
*/
public FluoConfiguration setObservers(List<ObserverConfiguration> observers) {
Iterator<String> iter1 = getKeys(OBSERVER_PREFIX);
while (iter1.hasNext()) {
String key = iter1.next();
if (key.substring(OBSERVER_PREFIX.length()).matches("\\d+")) {
clearProperty(key);
}
}
int count = 0;
for (ObserverConfiguration oconf : observers) {
Map<String, String> params = oconf.getParameters();
StringBuilder paramString = new StringBuilder();
for (java.util.Map.Entry<String, String> pentry : params.entrySet()) {
paramString.append(',');
paramString.append(pentry.getKey());
paramString.append('=');
paramString.append(pentry.getValue());
}
setProperty(OBSERVER_PREFIX + "" + count, oconf.getClassName() + paramString);
count++;
}
return this;
}
public FluoConfiguration setTransactionRollbackTime(long time, TimeUnit tu) {
return setPositiveLong(TRANSACTION_ROLLBACK_TIME_PROP, tu.toMillis(time));
}
public long getTransactionRollbackTime() {
return getPositiveLong(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
}
public FluoConfiguration setWorkerInstances(int workerInstances) {
return setPositiveInt(WORKER_INSTANCES_PROP, workerInstances);
}
public int getWorkerInstances() {
return getPositiveInt(WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
}
public FluoConfiguration setWorkerMaxMemory(int maxMemoryMB) {
return setPositiveInt(WORKER_MAX_MEMORY_MB_PROP, maxMemoryMB);
}
public int getWorkerMaxMemory() {
return getPositiveInt(WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
}
public FluoConfiguration setWorkerNumCores(int numCores) {
return setPositiveInt(WORKER_NUM_CORES_PROP, numCores);
}
public int getWorkerNumCores() {
return getPositiveInt(WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
}
public FluoConfiguration setLoaderThreads(int numThreads) {
return setNonNegativeInt(LOADER_NUM_THREADS_PROP, numThreads);
}
public int getLoaderThreads() {
return getNonNegativeInt(LOADER_NUM_THREADS_PROP, LOADER_NUM_THREADS_DEFAULT);
}
public FluoConfiguration setLoaderQueueSize(int queueSize) {
return setNonNegativeInt(LOADER_QUEUE_SIZE_PROP, queueSize);
}
public int getLoaderQueueSize() {
return getNonNegativeInt(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
}
public FluoConfiguration setOracleMaxMemory(int oracleMaxMemory) {
return setPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
}
public int getOracleMaxMemory() {
return getPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
}
public FluoConfiguration setOraclePort(int oraclePort) {
return setPort(ORACLE_PORT_PROP, oraclePort);
}
public int getOraclePort() {
return getPort(ORACLE_PORT_PROP, ORACLE_PORT_DEFAULT);
}
public FluoConfiguration setOracleInstances(int oracleInstances) {
return setPositiveInt(ORACLE_INSTANCES_PROP, oracleInstances);
}
public int getOracleInstances() {
return getPositiveInt(ORACLE_INSTANCES_PROP, ORACLE_INSTANCES_DEFAULT);
}
public FluoConfiguration setOracleNumCores(int numCores) {
return setPositiveInt(ORACLE_NUM_CORES_PROP, numCores);
}
public int getOracleNumCores() {
return getPositiveInt(ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
}
public FluoConfiguration setMiniClass(String miniClass) {
return setNonEmptyString(MINI_CLASS_PROP, miniClass);
}
public String getMiniClass() {
return getNonEmptyString(MINI_CLASS_PROP, MINI_CLASS_DEFAULT);
}
/**
* @return A {@link SubsetConfiguration} using the prefix {@value #APP_PREFIX}. Any change made to
* subset will be reflected in this configuration, but with the prefix added. This method
* is useful for setting application configuration before initialization. For reading
* application configration after initialization, see
* {@link FluoClient#getAppConfiguration()}
*/
public Configuration getAppConfiguration() {
return subset(APP_PREFIX);
}
/**
* Base64 encodes yaml and stores it in metrics yaml base64 property
*
* @param in yaml input
*/
public void setMetricsYaml(InputStream in) {
byte[] data = new byte[4096];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int len;
try {
while ((len = in.read(data)) > 0) {
baos.write(data, 0, len);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
setProperty(METRICS_YAML_BASE64, DatatypeConverter.printBase64Binary(baos.toByteArray())
.replace("\n", ""));
}
/**
* This property must be base64 encoded. If you have raw yaml, then consider using
* {@link #setMetricsYaml(InputStream)} which will do the base64 encoding for you.
*
* @param base64Yaml A base64 encoded yaml metrics config.
*/
public FluoConfiguration setMetricsYamlBase64(String base64Yaml) {
return setNonEmptyString(METRICS_YAML_BASE64, base64Yaml);
}
/**
* Consider using {@link #getMetricsYaml()} wich will automatically decode the base 64 value of
* this property.
*
* @return base64 encoded yaml metrics config.
*/
public String getMetricsYamlBase64() {
return getNonEmptyString(METRICS_YAML_BASE64, METRICS_YAML_BASE64_DEFAULT);
}
/**
* This method will decode the base64 yaml metrics config.
*
* @return stream that can be used to read yaml
*/
public InputStream getMetricsYaml() {
return new ByteArrayInputStream(DatatypeConverter.parseBase64Binary(getMetricsYamlBase64()));
}
public FluoConfiguration setMiniStartAccumulo(boolean startAccumulo) {
setProperty(MINI_START_ACCUMULO_PROP, startAccumulo);
return this;
}
public boolean getMiniStartAccumulo() {
return getBoolean(MINI_START_ACCUMULO_PROP, MINI_START_ACCUMULO_DEFAULT);
}
public FluoConfiguration setMiniDataDir(String dataDir) {
return setNonEmptyString(MINI_DATA_DIR_PROP, dataDir);
}
public String getMiniDataDir() {
return getNonEmptyString(MINI_DATA_DIR_PROP, MINI_DATA_DIR_DEFAULT);
}
protected void setDefault(String key, String val) {
if (getProperty(key) == null) {
setProperty(key, val);
}
}
/**
* Logs all properties
*/
public void print() {
Iterator<String> iter = getKeys();
while (iter.hasNext()) {
String key = iter.next();
log.info(key + " = " + getProperty(key));
}
}
private boolean verifyStringPropSet(String key) {
if (containsKey(key) && !getString(key).isEmpty()) {
return true;
}
log.info(key + " is not set");
return false;
}
private boolean verifyStringPropNotSet(String key) {
if (containsKey(key) && !getString(key).isEmpty()) {
log.info(key + " should not be set");
return false;
}
return true;
}
/**
* Returns true if required properties for FluoClient are set
*/
public boolean hasRequiredClientProps() {
boolean valid = true;
valid &= verifyStringPropSet(CLIENT_APPLICATION_NAME_PROP);
valid &= verifyStringPropSet(CLIENT_ACCUMULO_USER_PROP);
valid &= verifyStringPropSet(CLIENT_ACCUMULO_PASSWORD_PROP);
valid &= verifyStringPropSet(CLIENT_ACCUMULO_INSTANCE_PROP);
return valid;
}
/**
* Returns true if required properties for FluoAdmin are set
*/
public boolean hasRequiredAdminProps() {
boolean valid = true;
valid &= hasRequiredClientProps();
valid &= verifyStringPropSet(ADMIN_ACCUMULO_TABLE_PROP);
return valid;
}
/**
* Returns true if required properties for Oracle are set
*/
public boolean hasRequiredOracleProps() {
boolean valid = true;
valid &= hasRequiredClientProps();
return valid;
}
/**
* Returns true if required properties for Worker are set
*/
public boolean hasRequiredWorkerProps() {
boolean valid = true;
valid &= hasRequiredClientProps();
return valid;
}
/**
* Returns true if required properties for MiniFluo are set
*/
public boolean hasRequiredMiniFluoProps() {
boolean valid = true;
if (getMiniStartAccumulo()) {
// ensure that client properties are not set since we are using MiniAccumulo
valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_USER_PROP);
valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_PASSWORD_PROP);
valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_INSTANCE_PROP);
valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_ZOOKEEPERS_PROP);
valid &= verifyStringPropNotSet(CLIENT_ZOOKEEPER_CONNECT_PROP);
if (valid == false) {
log.error("Client properties should not be set in your configuration if MiniFluo is "
+ "configured to start its own accumulo (indicated by "
+ "io.fluo.mini.start.accumulo being set to true)");
}
} else {
valid &= hasRequiredClientProps();
valid &= hasRequiredAdminProps();
valid &= hasRequiredOracleProps();
valid &= hasRequiredWorkerProps();
}
return valid;
}
public Configuration getClientConfiguration() {
Configuration clientConfig = new CompositeConfiguration();
Iterator<String> iter = getKeys();
while (iter.hasNext()) {
String key = iter.next();
if (key.startsWith(CLIENT_PREFIX)) {
clientConfig.setProperty(key, getProperty(key));
}
}
return clientConfig;
}
/**
* Returns configuration with all Fluo properties set to their default. NOTE - some properties do
* not have defaults and will not be set.
*/
public static Configuration getDefaultConfiguration() {
Configuration config = new CompositeConfiguration();
setDefaultConfiguration(config);
return config;
}
/**
* Sets all Fluo properties to their default in the given configuration. NOTE - some properties do
* not have defaults and will not be set.
*/
public static void setDefaultConfiguration(Configuration config) {
config.setProperty(CLIENT_ZOOKEEPER_CONNECT_PROP, CLIENT_ZOOKEEPER_CONNECT_DEFAULT);
config.setProperty(CLIENT_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT);
config.setProperty(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT);
config.setProperty(CLIENT_CLASS_PROP, CLIENT_CLASS_DEFAULT);
config.setProperty(ADMIN_CLASS_PROP, ADMIN_CLASS_DEFAULT);
config.setProperty(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
config.setProperty(WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
config.setProperty(WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
config.setProperty(WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
config.setProperty(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
config.setProperty(LOADER_NUM_THREADS_PROP, LOADER_NUM_THREADS_DEFAULT);
config.setProperty(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
config.setProperty(ORACLE_PORT_PROP, ORACLE_PORT_DEFAULT);
config.setProperty(ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
config.setProperty(ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
config.setProperty(MINI_CLASS_PROP, MINI_CLASS_DEFAULT);
config.setProperty(MINI_START_ACCUMULO_PROP, MINI_START_ACCUMULO_DEFAULT);
config.setProperty(MINI_DATA_DIR_PROP, MINI_DATA_DIR_DEFAULT);
}
private FluoConfiguration setNonNegativeInt(String property, int value) {
Preconditions.checkArgument(value >= 0, property + " must be non-negative");
setProperty(property, value);
return this;
}
private int getNonNegativeInt(String property, int defaultValue) {
int value = getInt(property, defaultValue);
Preconditions.checkArgument(value >= 0, property + " must be non-negative");
return value;
}
private FluoConfiguration setPositiveInt(String property, int value) {
Preconditions.checkArgument(value > 0, property + " must be positive");
setProperty(property, value);
return this;
}
private int getPositiveInt(String property, int defaultValue) {
int value = getInt(property, defaultValue);
Preconditions.checkArgument(value > 0, property + " must be positive");
return value;
}
private FluoConfiguration setPositiveLong(String property, long value) {
Preconditions.checkArgument(value > 0, property + " must be positive");
setProperty(property, value);
return this;
}
private long getPositiveLong(String property, long defaultValue) {
long value = getLong(property, defaultValue);
Preconditions.checkArgument(value > 0, property + " must be positive");
return value;
}
private FluoConfiguration setPort(String property, int value) {
Preconditions.checkArgument(value >= 1 && value <= 65535, property
+ " must be valid port (1-65535)");
setProperty(property, value);
return this;
}
private int getPort(String property, int defaultValue) {
int value = getInt(property, defaultValue);
Preconditions.checkArgument(value >= 1 && value <= 65535, property
+ " must be valid port (1-65535)");
return value;
}
private FluoConfiguration setNonEmptyString(String property, String value) {
Preconditions.checkNotNull(value, property + " cannot be null");
Preconditions.checkArgument(!value.isEmpty(), property + " cannot be empty");
setProperty(property, value);
return this;
}
private String getNonEmptyString(String property, String defaultValue) {
String value = getString(property, defaultValue);
Preconditions.checkNotNull(value, property + " cannot be null");
Preconditions.checkArgument(!value.isEmpty(), property + " cannot be empty");
return value;
}
private String getNonEmptyString(String property) {
String value = getString(property);
Preconditions.checkNotNull(value, property + " cannot be null");
Preconditions.checkArgument(!value.isEmpty(), property + " cannot be empty");
return value;
}
private static String verifyNotNull(String property, String value) {
Preconditions.checkNotNull(value, property + " cannot be null");
return value;
}
}