blob: 6d24ee3d38d8d686ca27284f49b5b7629f663359 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.conf;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.ConfigKey;
import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.service.BaseServiceTracker;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.NumberUtil;
import org.apache.tajo.util.datetime.DateTimeConstants;
import org.apache.tajo.validation.ConstraintViolationException;
import org.apache.tajo.validation.Validator;
import org.apache.tajo.validation.Validators;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
public class TajoConf extends Configuration {
private static int DATE_ORDER = -1;
private static final Map<String, ConfVars> vars = new HashMap<>();
static {
Configuration.addDefaultResource("catalog-default.xml");
Configuration.addDefaultResource("catalog-site.xml");
Configuration.addDefaultResource("storage-default.xml");
Configuration.addDefaultResource("storage-site.xml");
Configuration.addDefaultResource("tajo-default.xml");
Configuration.addDefaultResource("tajo-site.xml");
for (ConfVars confVars: ConfVars.values()) {
vars.put(confVars.keyname(), confVars);
}
}
private static final String EMPTY_VALUE = "";
public TajoConf() {
super();
}
public TajoConf(Configuration conf) {
super(conf);
}
public TajoConf(Path path) {
super();
addResource(path);
}
public TimeZone getSystemTimezone() {
return TimeZone.getTimeZone(getVar(ConfVars.$TIMEZONE));
}
public void setSystemTimezone(TimeZone timezone) {
setVar(ConfVars.$TIMEZONE, timezone.getID());
}
public static int getDateOrder() {
if (DATE_ORDER < 0) {
TajoConf tajoConf = new TajoConf();
String dateOrder = tajoConf.getVar(ConfVars.$DATE_ORDER);
if ("YMD".equals(dateOrder)) {
DATE_ORDER = DateTimeConstants.DATEORDER_YMD;
} else if ("DMY".equals(dateOrder)) {
DATE_ORDER = DateTimeConstants.DATEORDER_DMY;
} else if ("MDY".equals(dateOrder)) {
DATE_ORDER = DateTimeConstants.DATEORDER_MDY;
} else {
DATE_ORDER = DateTimeConstants.DATEORDER_YMD;
}
}
return DATE_ORDER;
}
@VisibleForTesting
public static int setDateOrder(int dateOrder) {
int oldDateOrder = DATE_ORDER;
DATE_ORDER = dateOrder;
return oldDateOrder;
}
public static enum ConfVars implements ConfigKey {
///////////////////////////////////////////////////////////////////////////////////////
// Tajo System Configuration
//
// They are all static configs which are not changed or not overwritten at all.
///////////////////////////////////////////////////////////////////////////////////////
// a username for a running Tajo cluster
ROOT_DIR("tajo.rootdir", "file:///tmp/tajo-${user.name}/",
Validators.groups(Validators.notNull(), Validators.pathUrl())),
USERNAME("tajo.username", "${user.name}", Validators.javaString()),
// Configurable System Directories
WAREHOUSE_DIR("tajo.warehouse.directory", EMPTY_VALUE, Validators.pathUrl()),
STAGING_ROOT_DIR("tajo.staging.directory", "/tmp/tajo-${user.name}/staging", Validators.pathUrl()),
SYSTEM_CONF_PATH("tajo.system-conf.path", EMPTY_VALUE, Validators.pathUrl()),
SYSTEM_CONF_REPLICA_COUNT("tajo.system-conf.replica-count", 20, Validators.min("1")),
// Tajo Master Service Addresses
TAJO_MASTER_UMBILICAL_RPC_ADDRESS("tajo.master.umbilical-rpc.address", "localhost:26001",
Validators.networkAddr()),
TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002", Validators.networkAddr()),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),
// Resource tracker service
RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "0.0.0.0:26003", Validators.networkAddr()),
RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120), // seconds
// Tajo Rest Service
REST_SERVICE_ADDRESS("tajo.rest.service.address", "0.0.0.0:26880", Validators.networkAddr()),
// Catalog
CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "0.0.0.0:26005", Validators.networkAddr()),
// High availability configurations
TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
TAJO_MASTER_HA_CLIENT_RETRY_MAX_NUM("tajo.master.ha.client.read.retry.max-num", 120), // 120 retry
TAJO_MASTER_HA_CLIENT_RETRY_PAUSE_TIME("tajo.master.ha.client.read.pause-time", 500), // 500 ms
// Service discovery
DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()),
HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"),
// Async IO Task Service
/** The number of threads for async tasks */
MASTER_ASYNC_TASK_THREAD_NUM("tajo.master.async-task.thread-num", 4),
/** How long it will wait for termination */
MASTER_ASYNC_TASK_TERMINATION_WAIT_TIME("tajo.master.async-task.wait-time-sec", 60), // 1 min
// QueryMaster resource
QUERYMASTER_MINIMUM_MEMORY("tajo.qm.resource.min.memory-mb", 500, Validators.min("64")),
// Worker task resource
TASK_RESOURCE_MINIMUM_MEMORY("tajo.task.resource.min.memory-mb", 1000, Validators.min("64")),
// Tajo Worker Service Addresses
WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080", Validators.networkAddr()),
WORKER_QM_INFO_ADDRESS("tajo.worker.qm-info-http.address", "0.0.0.0:28081", Validators.networkAddr()),
WORKER_PEER_RPC_ADDRESS("tajo.worker.peer-rpc.address", "0.0.0.0:28091", Validators.networkAddr()),
WORKER_CLIENT_RPC_ADDRESS("tajo.worker.client-rpc.address", "0.0.0.0:28092", Validators.networkAddr()),
WORKER_QM_RPC_ADDRESS("tajo.worker.qm-rpc.address", "0.0.0.0:28093", Validators.networkAddr()),
// Tajo Worker Temporal Directories
WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir", Validators.pathUrlList()),
WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false, Validators.bool()),
// Tajo Worker Resources
WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores",
Math.max(Runtime.getRuntime().availableProcessors(), 2), Validators.min("2")), // 1qm + 1task container
WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1500, Validators.min("64")),
WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
Validators.min("1")),
WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE("tajo.worker.heartbeat.queue.threshold-rate", 0.1f, Validators.min("0")),//10%
WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000), // 10 sec
WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000), // 1 sec
//Default query scheduler
RESOURCE_SCHEDULER_CLASS("tajo.resource.scheduler", "org.apache.tajo.master.scheduler.SimpleScheduler",
Validators.groups(Validators.notNull(), Validators.clazz())),
QUERYMASTER_TASK_SCHEDULER_DELAY("tajo.qm.task-scheduler.delay", 50), // 50 ms
QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM("tajo.qm.task-scheduler.request.max-num", 50),
// Query Configuration
QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 0, Validators.min("0")),
// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
Validators.min("1")),
YARN_SHUFFLE_SERVICE_ENABLED("tajo.shuffle.yarn-service.enabled", false, Validators.bool()),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
2, Validators.min("1")),
SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192),
SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, Validators.min("1")),
SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, Validators.min("1")),
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 2, Validators.min("0")),
SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volume-mb", 30),
SHUFFLE_HASH_PARENT_DIRS("tajo.shuffle.hash.parent.dirs.count", 64),
// Query output Configuration --------------------------------------------------
QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", BuiltinStorages.DRAW, Validators.javaString()),
// Storage Configuration --------------------------------------------------
ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100),
MINIMUM_SPLIT_SIZE("tajo.min.split.size", 32 * StorageUnit.MB, Validators.min("1")),
// for RCFile
HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()),
// RPC --------------------------------------------------------------------
// Internal RPC Client
INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
RPC_CLIENT_RETRY_NUM("tajo.rpc.client.retry-num", 3, Validators.min("1")),
RPC_CLIENT_CONNECTION_TIMEOUT("tajo.rpc.client.connection-timeout-ms", (long)15 * 1000, Validators.min("0")),
RPC_CLIENT_SOCKET_TIMEOUT("tajo.rpc.client.socket-timeout-ms", (long)180 * 1000, Validators.min("0")),
RPC_CLIENT_HANG_DETECTION_ENABLED("tajo.rpc.client.hang-detection", true, Validators.bool()),
// Internal RPC Server
MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.querymaster.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
WORKER_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
CATALOG_RPC_SERVER_WORKER_THREAD_NUM("tajo.catalog.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM("tajo.shuffle.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2),
// Client RPC
RPC_CLIENT_WORKER_THREAD_NUM("tajo.rpc.client.worker-thread-num", 4),
SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM("tajo.shuffle.rpc.client.worker-thread-num",
Runtime.getRuntime().availableProcessors()),
//Client service RPC Server
MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.service.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 1),
WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.service.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 1),
REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.rest.service.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 1),
// Task Configuration -----------------------------------------------------
TASK_DEFAULT_SIZE("tajo.task.size-mb", 128),
// Query and Optimization -------------------------------------------------
// This class provides a ordered list of logical plan rewrite rule classes.
LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS("tajo.plan.logical.rewriter.provider",
"org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteRuleProvider"),
// This class provides a ordered list of global plan rewrite rule classes.
GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS("tajo.plan.global.rewriter.provider",
"org.apache.tajo.engine.planner.global.rewriter.BaseGlobalPlanRewriteRuleProvider"),
EXECUTOR_EXTERNAL_SORT_THREAD_NUM("tajo.executor.external-sort.thread-num", 1),
EXECUTOR_EXTERNAL_SORT_FANOUT("tajo.executor.external-sort.fanout-num", 8),
// Metrics ----------------------------------------------------------------
METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"),
// Query History ---------------------------------------------------------
HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"),
HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"),
HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7),
HISTORY_QUERY_CACHE_SIZE("tajo.history.cache.size", 100, Validators.min("0")),
// Misc -------------------------------------------------------------------
// Fragment
// When making physical plan, the length of fragment is used to determine the physical operation.
// Some storage does not know the size of the fragment.
// In this case PhysicalPlanner uses this value to determine.
FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH("tajo.fragment.alternative.unknown.length", (long)(512 * 1024 * 1024)),
// Geo IP
GEOIP_DATA("tajo.function.geoip-database-location", ""),
// Python UDF
PYTHON_CODE_DIR("tajo.function.python.code-dir", ""),
PYTHON_CONTROLLER_LOG_DIR("tajo.function.python.controller.log-dir", ""),
// HIVE UDF
HIVE_UDF_JAR_DIR("tajo.function.hive.jar-dir", "./lib/hiveudf"),
// Partition
PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE("tajo.partition.dynamic.bulk-insert.batch-size", 1000),
/////////////////////////////////////////////////////////////////////////////////
// User Session Configuration
//
// All session variables begin with dollar($) sign. They are default configs
// for session variables. Do not directly use the following configs. Instead,
// please use QueryContext in order to access session variables.
//
// Also, users can change the default values of session variables in tajo-site.xml.
/////////////////////////////////////////////////////////////////////////////////
$EMPTY("tajo._", ""),
// Query and Optimization ---------------------------------------------------
// Enables the optimizer to get and use table volumes via storage handlers.
// This feature may cause some performance degradation when storage access is too slow (S3).
// By default, this config value is false, and in this case the optimizer uses the table stats from catalog.
$USE_TABLE_VOLUME("tajo.optimizer.stats.use-table-volume", Boolean.FALSE),
// for distributed query strategies
$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-kb", 5 * 1024l,
Validators.min("0")), // 5 MB
$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-kb", 1 * 1024l,
Validators.min("0")), // 1 MB
$DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 64),
$DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 64),
$DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 64),
$DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128, Validators.min("1")),
$DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256, Validators.min("1")),
$DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256, Validators.min("1")),
$GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),
$QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 10),
// for physical Executors
$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200),
$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb", 64l, Validators.min("0")),
$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb", 64l,
Validators.min("0")),
$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-mb", 64l,
Validators.min("0")),
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb", 64l,
Validators.min("0")),
$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE("tajo.executor.hash-shuffle.buffer-mb", 100, Validators.min("1")),
$MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite
$CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this is broken)
$AGG_HASH_TABLE_SIZE("tajo.executor.aggregate.hash-table.size", 10000),
$SORT_LIST_SIZE("tajo.executor.sort.list.size", 100000),
$JOIN_HASH_TABLE_SIZE("tajo.executor.join.hash-table.size", 100000),
$SORT_ALGORITHM("tajo.executor.sort.algorithm", "TIM"),
// for index
$INDEX_ENABLED("tajo.query.index.enabled", false),
$INDEX_SELECTIVITY_THRESHOLD("tajo.query.index.selectivity.threshold", 0.05f),
// Client -----------------------------------------------------------------
$CLIENT_SESSION_EXPIRY_TIME("tajo.client.session.expiry-time-sec", 3600), // default time is one hour.
// Command line interface and its behavior --------------------------------
$CLI_MAX_COLUMN("tajo.cli.max_columns", 120),
$CLI_NULL_CHAR("tajo.cli.nullchar", ""),
$CLI_PRINT_PAUSE_NUM_RECORDS("tajo.cli.print.pause.num.records", 100),
$CLI_PRINT_PAUSE("tajo.cli.print.pause", true),
$CLI_PRINT_ERROR_TRACE("tajo.cli.print.error.trace", true),
$CLI_OUTPUT_FORMATTER_CLASS("tajo.cli.output.formatter", "org.apache.tajo.cli.tsql.DefaultTajoCliOutputFormatter"),
$CLI_ERROR_STOP("tajo.cli.error.stop", false),
// Timezone & Date ----------------------------------------------------------
$TIMEZONE("tajo.timezone", TimeZone.getDefault().getID()),
$DATE_ORDER("tajo.datetime.date-order", "YMD"),
// null character for text file output
$TEXT_NULL("tajo.text.null", NullDatum.DEFAULT_TEXT),
// Only for Debug and Testing
$DEBUG_ENABLED(TajoConstants.DEBUG_KEY, false),
$TEST_MODE(TajoConstants.TEST_KEY, false),
$TEST_BROADCAST_JOIN_ENABLED("tajo.dist-query.join.auto-broadcast", true),
$TEST_JOIN_OPT_ENABLED("tajo.test.plan.join-optimization.enabled", true),
$TEST_FILTER_PUSHDOWN_ENABLED("tajo.test.plan.filter-pushdown.enabled", true),
$TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
$TEST_PLAN_SHAPE_FIX_ENABLED("tajo.test.plan.shape.fix.enabled", false), // used for explain statement test
$TEST_TIM_SORT_THRESHOLD_FOR_RADIX_SORT("tajo.test.executor.radix-sort.tim-sort-threshold", 65536),
// Behavior Control ---------------------------------------------------------
$BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
// If True, a partitioned table is overwritten even if a sub query leads to no result.
// Otherwise, the table data will be kept if there is no result
$PARTITION_NO_RESULT_OVERWRITE_ENABLED("tajo.partition.overwrite.even-if-no-result", false),
// ResultSet ---------------------------------------------------------
$RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
$RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true),
$COMPRESSED_RESULT_TRANSFER("tajo.resultset.compression", false),
;
public final String varname;
public final String defaultVal;
public final int defaultIntVal;
public final long defaultLongVal;
public final float defaultFloatVal;
public final Class<?> valClass;
public final boolean defaultBoolVal;
private final VarType type;
private Validator validator;
ConfVars(String varname, String defaultVal) {
this.varname = varname;
this.valClass = String.class;
this.defaultVal = defaultVal;
this.defaultIntVal = -1;
this.defaultLongVal = -1;
this.defaultFloatVal = -1;
this.defaultBoolVal = false;
this.type = VarType.STRING;
}
ConfVars(String varname, String defaultVal, Validator validator) {
this(varname, defaultVal);
this.validator = validator;
}
ConfVars(String varname, int defaultIntVal) {
this.varname = varname;
this.valClass = Integer.class;
this.defaultVal = Integer.toString(defaultIntVal);
this.defaultIntVal = defaultIntVal;
this.defaultLongVal = -1;
this.defaultFloatVal = -1;
this.defaultBoolVal = false;
this.type = VarType.INT;
}
ConfVars(String varname, int defaultIntVal, Validator validator) {
this(varname, defaultIntVal);
this.validator = validator;
}
ConfVars(String varname, long defaultLongVal) {
this.varname = varname;
this.valClass = Long.class;
this.defaultVal = Long.toString(defaultLongVal);
this.defaultIntVal = -1;
this.defaultLongVal = defaultLongVal;
this.defaultFloatVal = -1;
this.defaultBoolVal = false;
this.type = VarType.LONG;
}
ConfVars(String varname, long defaultLongVal, Validator validator) {
this(varname, defaultLongVal);
this.validator = validator;
}
ConfVars(String varname, float defaultFloatVal) {
this.varname = varname;
this.valClass = Float.class;
this.defaultVal = Float.toString(defaultFloatVal);
this.defaultIntVal = -1;
this.defaultLongVal = -1;
this.defaultFloatVal = defaultFloatVal;
this.defaultBoolVal = false;
this.type = VarType.FLOAT;
}
ConfVars(String varname, float defaultFloatVal, Validator validator) {
this(varname, defaultFloatVal);
this.validator = validator;
}
ConfVars(String varname, boolean defaultBoolVal) {
this.varname = varname;
this.valClass = Boolean.class;
this.defaultVal = Boolean.toString(defaultBoolVal);
this.defaultIntVal = -1;
this.defaultLongVal = -1;
this.defaultFloatVal = -1;
this.defaultBoolVal = defaultBoolVal;
this.type = VarType.BOOLEAN;
}
ConfVars(String varname, boolean defaultBoolVal, Validator validator) {
this(varname, defaultBoolVal);
this.validator = validator;
}
enum VarType {
STRING { void checkType(String value) throws Exception { } },
INT { void checkType(String value) throws Exception { Integer.valueOf(value); } },
LONG { void checkType(String value) throws Exception { Long.valueOf(value); } },
FLOAT { void checkType(String value) throws Exception { Float.valueOf(value); } },
BOOLEAN { void checkType(String value) throws Exception { Boolean.valueOf(value); } };
boolean isType(String value) {
try { checkType(value); } catch (Exception e) { return false; }
return true;
}
String typeString() { return name().toUpperCase();}
abstract void checkType(String value) throws Exception;
}
@Override
public String keyname() {
return varname;
}
@Override
public ConfigType type() {
return ConfigType.SYSTEM;
}
@Override
public Class<?> valueClass() {
return valClass;
}
@Override
public Validator validator() {
return validator;
}
}
public static int getIntVar(Configuration conf, ConfVars var) {
assert (var.valClass == Integer.class);
return conf.getInt(var.varname, var.defaultIntVal);
}
public static void setIntVar(Configuration conf, ConfVars var, int val) {
assert (var.valClass == Integer.class);
conf.setInt(var.varname, val);
}
public int getIntVar(ConfVars var) {
return getIntVar(this, var);
}
public void setIntVar(ConfVars var, int val) {
setIntVar(this, var, val);
}
public static long getLongVar(Configuration conf, ConfVars var) {
assert (var.valClass == Long.class || var.valClass == Integer.class || var.valClass == Float.class);
if (var.valClass == Integer.class) {
return conf.getInt(var.varname, var.defaultIntVal);
} else {
return conf.getLong(var.varname, var.defaultLongVal);
}
}
public static long getLongVar(Configuration conf, ConfVars var, long defaultVal) {
return conf.getLong(var.varname, defaultVal);
}
public static void setLongVar(Configuration conf, ConfVars var, long val) {
assert (var.valClass == Long.class);
conf.setLong(var.varname, val);
}
public long getLongVar(ConfVars var) {
return getLongVar(this, var);
}
public void setLongVar(ConfVars var, long val) {
setLongVar(this, var, val);
}
public static float getFloatVar(Configuration conf, ConfVars var) {
assert (var.valClass == Float.class);
return conf.getFloat(var.varname, var.defaultFloatVal);
}
public static float getFloatVar(Configuration conf, ConfVars var, float defaultVal) {
return conf.getFloat(var.varname, defaultVal);
}
public static void setFloatVar(Configuration conf, ConfVars var, float val) {
assert (var.valClass == Float.class);
conf.setFloat(var.varname, val);
}
public float getFloatVar(ConfVars var) {
return getFloatVar(this, var);
}
public void setFloatVar(ConfVars var, float val) {
setFloatVar(this, var, val);
}
public static boolean getBoolVar(Configuration conf, ConfVars var) {
assert (var.valClass == Boolean.class);
return conf.getBoolean(var.varname, var.defaultBoolVal);
}
public static boolean getBoolVar(Configuration conf, ConfVars var, boolean defaultVal) {
return conf.getBoolean(var.varname, defaultVal);
}
public static void setBoolVar(Configuration conf, ConfVars var, boolean val) {
assert (var.valClass == Boolean.class);
conf.setBoolean(var.varname, val);
}
public boolean getBoolVar(ConfVars var) {
return getBoolVar(this, var);
}
public void setBoolVar(ConfVars var, boolean val) {
setBoolVar(this, var, val);
}
// borrowed from HIVE-5799
public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
return toTime(getVar(conf, var), outUnit);
}
public static void setTimeVar(Configuration conf, ConfVars var, long time, TimeUnit timeunit) {
assert (var.valClass == String.class) : var.varname;
conf.set(var.varname, time + stringFor(timeunit));
}
public long getTimeVar(ConfVars var, TimeUnit outUnit) {
return getTimeVar(this, var, outUnit);
}
public void setTimeVar(ConfVars var, long time, TimeUnit outUnit) {
setTimeVar(this, var, time, outUnit);
}
public static long toTime(String value, TimeUnit outUnit) {
String[] parsed = parseTime(value.trim());
return outUnit.convert(Long.valueOf(parsed[0].trim()), unitFor(parsed[1].trim()));
}
private static String[] parseTime(String value) {
char[] chars = value.toCharArray();
int i = 0;
for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i])); i++) {
}
return new String[] {value.substring(0, i), value.substring(i)};
}
public static TimeUnit unitFor(String unit) {
unit = unit.trim().toLowerCase();
if (unit.isEmpty() || unit.equals("l")) {
return TimeUnit.MILLISECONDS;
} else if (unit.equals("d") || unit.startsWith("day")) {
return TimeUnit.DAYS;
} else if (unit.equals("h") || unit.startsWith("hour")) {
return TimeUnit.HOURS;
} else if (unit.equals("m") || unit.startsWith("min")) {
return TimeUnit.MINUTES;
} else if (unit.equals("s") || unit.startsWith("sec")) {
return TimeUnit.SECONDS;
} else if (unit.equals("ms") || unit.startsWith("msec")) {
return TimeUnit.MILLISECONDS;
} else if (unit.equals("us") || unit.startsWith("usec")) {
return TimeUnit.MICROSECONDS;
} else if (unit.equals("ns") || unit.startsWith("nsec")) {
return TimeUnit.NANOSECONDS;
}
throw new IllegalArgumentException("Invalid time unit " + unit);
}
public static String stringFor(TimeUnit timeunit) {
switch (timeunit) {
case DAYS: return "day";
case HOURS: return "hour";
case MINUTES: return "min";
case SECONDS: return "sec";
case MILLISECONDS: return "msec";
case MICROSECONDS: return "usec";
case NANOSECONDS: return "nsec";
}
throw new IllegalArgumentException("Invalid timeunit " + timeunit);
}
public void setClassVar(ConfVars var, Class<?> clazz) {
setVar(var, clazz.getCanonicalName());
}
public Class<?> getClassVar(ConfVars var) {
String valueString = getVar(var);
try {
return getClassByName(valueString);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static String getVar(Configuration conf, ConfVars var) {
return conf.get(var.varname, var.defaultVal);
}
public static String getVar(Configuration conf, ConfVars var, String defaultVal) {
return conf.get(var.varname, defaultVal);
}
public static void setVar(Configuration conf, ConfVars var, String val) {
assert (var.valClass == String.class);
conf.set(var.varname, val);
}
public static ConfVars getConfVars(String name) {
return vars.get(name);
}
public String getVar(ConfVars var) {
return getVar(this, var);
}
public void setVar(ConfVars var, String val) {
setVar(this, var, val);
}
public void logVars(PrintStream ps) {
for (ConfVars one : ConfVars.values()) {
ps.println(one.varname + "=" + ((get(one.varname) != null) ? get(one.varname) : ""));
}
}
public InetSocketAddress getSocketAddrVar(ConfVars var) {
final String address = getVar(var);
return NetUtils.createSocketAddr(address);
}
/**
* Returns InetSocketAddress that a client can use to connect to the server.
* If the configured address is any local address(”0.0.0.0”), finds default host in defaultVar.
*
* @param var
* @param defaultVar
* @return InetSocketAddress
*/
public InetSocketAddress getSocketAddrVar(ConfVars var, ConfVars defaultVar) {
InetSocketAddress addr = NetUtils.createSocketAddr(getVar(var));
if (addr.getAddress().isAnyLocalAddress()) {
InetSocketAddress defaultAddr = NetUtils.createSocketAddr(getVar(defaultVar));
addr = NetUtils.createSocketAddr(defaultAddr.getHostName(), addr.getPort());
}
return addr;
}
/////////////////////////////////////////////////////////////////////////////
// Tajo System Specific Methods
/////////////////////////////////////////////////////////////////////////////
public static Path getTajoRootDir(TajoConf conf) {
String rootPath = conf.getVar(ConfVars.ROOT_DIR);
Preconditions.checkNotNull(rootPath,
ConfVars.ROOT_DIR.varname + " must be set before a Tajo Cluster starts up");
FileSystem fs;
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
throw new TajoInternalError(e);
}
return fs.makeQualified(new Path(rootPath));
}
public static Path getWarehouseDir(TajoConf conf) {
String warehousePath = conf.getVar(ConfVars.WAREHOUSE_DIR);
if (warehousePath == null || warehousePath.equals("")) {
Path rootDir = getTajoRootDir(conf);
warehousePath = new Path(rootDir, TajoConstants.WAREHOUSE_DIR_NAME).toUri().toString();
conf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath);
return new Path(warehousePath);
} else {
return new Path(warehousePath);
}
}
public static Path getSystemDir(TajoConf conf) {
Path rootPath = getTajoRootDir(conf);
return new Path(rootPath, TajoConstants.SYSTEM_DIR_NAME);
}
public static Path getSystemResourceDir(TajoConf conf) {
return new Path(getSystemDir(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
}
public static Path getSystemHADir(TajoConf conf) {
return new Path(getSystemDir(conf), TajoConstants.SYSTEM_HA_DIR_NAME);
}
private static boolean hasScheme(String path) {
return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0;
}
/**
* It returns the default root staging directory used by queries without a target table or
* a specified output directory. An example query is <pre>SELECT a,b,c FROM XXX;</pre>.
*
* @param conf TajoConf
* @return Path which points the default staging directory
* @throws IOException
*/
public static Path getDefaultRootStagingDir(TajoConf conf) throws IOException {
String stagingDirString = conf.getVar(ConfVars.STAGING_ROOT_DIR);
if (!hasScheme(stagingDirString)) {
Path warehousePath = getWarehouseDir(conf);
FileSystem fs = warehousePath.getFileSystem(conf);
Path path = new Path(fs.getUri().toString(), stagingDirString);
conf.setVar(ConfVars.STAGING_ROOT_DIR, path.toString());
return path;
}
return new Path(stagingDirString);
}
/**
* It returns the temporal query directory
* An example dir is <pre>/{staging-dir}/{queryId}/RESULT</pre>.
*
* @param conf TajoConf
* @param queryId queryId
* @throws IOException
*/
public static Path getTemporalResultDir(TajoConf conf, QueryId queryId) throws IOException {
Path queryDir = new Path(getDefaultRootStagingDir(conf), queryId.toString());
return new Path(queryDir, TajoConstants.RESULT_DIR_NAME);
}
public static Path getQueryHistoryDir(TajoConf conf) throws IOException {
String historyDirString = conf.getVar(ConfVars.HISTORY_QUERY_DIR);
if (!hasScheme(historyDirString)) {
Path stagingPath = getDefaultRootStagingDir(conf);
FileSystem fs = stagingPath.getFileSystem(conf);
Path path = new Path(fs.getUri().toString(), historyDirString);
conf.setVar(ConfVars.HISTORY_QUERY_DIR, path.toString());
return path;
}
return new Path(historyDirString);
}
public static Path getTaskHistoryDir(TajoConf conf) throws IOException {
String historyDirString = conf.getVar(ConfVars.HISTORY_TASK_DIR);
if (!hasScheme(historyDirString)) {
//Local dir
historyDirString = "file://" + historyDirString;
}
return new Path(historyDirString);
}
public static Path getSystemConfPath(TajoConf conf) {
String systemConfPathStr = conf.getVar(ConfVars.SYSTEM_CONF_PATH);
if (systemConfPathStr == null || systemConfPathStr.equals("")) {
Path systemResourcePath = getSystemResourceDir(conf);
Path systemConfPath = new Path(systemResourcePath, TajoConstants.SYSTEM_CONF_FILENAME);
conf.setVar(ConfVars.SYSTEM_CONF_PATH, systemConfPath.toString());
return systemConfPath;
} else {
return new Path(systemConfPathStr);
}
}
/**
* validateProperty function will fetch pre-defined configuration property by keyname.
* If found, it will validate the supplied value with these validators.
*
* @param name - a string containing specific key
* @param value - a string containing value
* @throws ConstraintViolationException
*/
public void validateProperty(String name, String value) throws ConstraintViolationException {
ConfigKey configKey = null;
configKey = TajoConf.getConfVars(name);
if (configKey == null) {
configKey = SessionVars.get(name);
}
if (configKey != null && configKey.validator() != null && configKey.valueClass() != null) {
Object valueObj = value;
if (Number.class.isAssignableFrom(configKey.valueClass())) {
valueObj = NumberUtil.numberValue(configKey.valueClass(), value);
if (valueObj == null) {
return;
}
}
configKey.validator().validate(valueObj, true);
}
}
}