blob: 8de4cb9fa35406052e7ee88d9bf0ed2dc23fea6c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo;
import com.google.common.base.Charsets;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tajo.catalog.*;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
import org.apache.tajo.exception.UnsupportedCatalogStore;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
import org.apache.tajo.querymaster.Query;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.worker.TajoWorker;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
import java.util.UUID;
public class TajoTestingCluster {
private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
private TajoConf conf;
private FileSystem defaultFS;
private MiniDFSCluster dfsCluster;
private CatalogServer catalogServer;
private HBaseTestClusterUtil hbaseUtil;
private TajoMaster tajoMaster;
private List<TajoWorker> tajoWorkers = new ArrayList<>();
private boolean isDFSRunning = false;
private boolean isTajoClusterRunning = false;
private boolean isCatalogServerRunning = false;
private File clusterTestBuildDir = null;
/**
* Default parent directory for test output.
*/
public static final String DEFAULT_TEST_DIRECTORY = "target/" +
System.getProperty("tajo.test.data.dir", "test-data");
/**
* True If HiveCatalogStore is used. Otherwise, it is FALSE.
*/
public Boolean isHiveCatalogStoreUse = false;
private static final String LOG_LEVEL;
static {
LOG_LEVEL = System.getProperty("LOG_LEVEL");
}
public TajoTestingCluster() {
this(false);
}
public TajoTestingCluster(boolean masterHaEMode) {
this.conf = new TajoConf();
this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode);
initTestDir();
setTestingFlagProperties();
initPropertiesAndConfigs();
}
void setTestingFlagProperties() {
System.setProperty(TajoConstants.TEST_KEY, Boolean.TRUE.toString());
conf.set(TajoConstants.TEST_KEY, Boolean.TRUE.toString());
}
void initPropertiesAndConfigs() {
// Injection of equality testing code of logical plan (de)serialization
conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, LogicalPlanTestRuleProvider.class);
conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, GlobalPlanTestRuleProvider.class);
conf.setLongVar(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD, 1024 * 1024); // 1GB
conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4);
conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000);
conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 3);
conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 1);
// Client API RPC
conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2);
//Client API service RPC Server
conf.setIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
// Internal RPC Client
conf.setIntVar(ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM, 2);
// Internal RPC Server
conf.setIntVar(ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
// Memory cache termination
conf.setIntVar(ConfVars.HISTORY_QUERY_CACHE_SIZE, 10);
// Python function path
conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString());
// Buffer size
conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1);
/* decrease Hbase thread and memory cache for testing */
//server handler
conf.setInt("hbase.regionserver.handler.count", 5);
//client handler
conf.setInt("hbase.hconnection.threads.core", 5);
conf.setInt("hbase.hconnection.threads.max", 10);
conf.setInt("hbase.hconnection.meta.lookup.threads.core", 5);
conf.setInt("hbase.hconnection.meta.lookup.threads.max", 10);
//memory cache
conf.setFloat("hfile.block.cache.size", 0.0f); //disable cache
conf.setBoolean("hbase.bucketcache.combinedcache.enabled", false);
/* Since Travis CI limits the size of standard output log up to 4MB */
if (!StringUtils.isEmpty(LOG_LEVEL)) {
Level defaultLevel = Logger.getRootLogger().getLevel();
Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
Logger.getLogger("org.mortbay.log").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel));
}
}
public TajoConf getConfiguration() {
return this.conf;
}
public void initTestDir() {
if (clusterTestBuildDir == null) {
clusterTestBuildDir = setupClusterTestBuildDir();
}
}
/**
* @return Where to write test data on local filesystem; usually
* {@link #DEFAULT_TEST_DIRECTORY}
* @see #setupClusterTestBuildDir()
*/
public File getTestDir() {
return clusterTestBuildDir;
}
/**
* @param subdirName
* @return Path to a subdirectory named <code>subdirName</code> under
* {@link #getTestDir()}.
* @see #setupClusterTestBuildDir()
*/
public static File getTestDir(final String subdirName) {
return new File(new File(DEFAULT_TEST_DIRECTORY), subdirName);
}
public static File setupClusterTestBuildDir() {
String randomStr = UUID.randomUUID().toString();
String dirStr = getTestDir(randomStr).toString();
File dir = new File(dirStr).getAbsoluteFile();
// Have it cleaned up on exit
dir.deleteOnExit();
return dir;
}
////////////////////////////////////////////////////////
// HDFS Section
////////////////////////////////////////////////////////
/**
* Start a minidfscluster.
* @param servers How many DNs to start.
* @throws Exception
* @see {@link #shutdownMiniDFSCluster()}
* @return The mini dfs cluster created.
*/
public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
return startMiniDFSCluster(servers, null, null);
}
/**
* Start a minidfscluster.
* Can only create one.
* @param servers How many DNs to start.
* @param dir Where to home your dfs cluster.
* @param hosts hostnames DNs to run on.
* @throws Exception
* @see {@link #shutdownMiniDFSCluster()}
* @return The mini dfs cluster created.
* @throws java.io.IOException
*/
public MiniDFSCluster startMiniDFSCluster(int servers,
File dir,
final String hosts[])
throws IOException {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.getAbsolutePath());
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, 0);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
builder.hosts(hosts);
builder.numDataNodes(servers);
builder.format(true);
builder.waitSafeMode(true);
this.dfsCluster = builder.build();
// Set this just-started cluster as our filesystem.
this.defaultFS = this.dfsCluster.getFileSystem();
this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
isDFSRunning = true;
return this.dfsCluster;
}
public void shutdownMiniDFSCluster() throws Exception {
if (this.dfsCluster != null) {
try {
FileSystem fs = this.dfsCluster.getFileSystem();
if (fs != null) fs.close();
} catch (IOException e) {
System.err.println("error closing file system: " + e);
}
// The below throws an exception per dn, AsynchronousCloseException.
this.dfsCluster.shutdown();
}
}
public boolean isRunningDFSCluster() {
return this.defaultFS != null;
}
public MiniDFSCluster getMiniDFSCluster() {
return this.dfsCluster;
}
public FileSystem getDefaultFileSystem() {
return this.defaultFS;
}
public HBaseTestClusterUtil getHBaseUtil() {
return hbaseUtil;
}
////////////////////////////////////////////////////////
// Catalog Section
////////////////////////////////////////////////////////
public CatalogServer startCatalogCluster() throws Exception {
if(isCatalogServerRunning) throw new IOException("Catalog Cluster already running");
CatalogTestingUtil.configureCatalog(conf, clusterTestBuildDir.getAbsolutePath());
LOG.info("Apache Derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI));
conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
catalogServer = new CatalogServer();
catalogServer.init(conf);
catalogServer.start();
isCatalogServerRunning = true;
return this.catalogServer;
}
public void shutdownCatalogCluster() {
try {
CatalogTestingUtil.shutdownCatalogStore(conf);
} catch (Exception e) {
//ignore
}
if (catalogServer != null) {
this.catalogServer.stop();
}
isCatalogServerRunning = false;
}
public CatalogServer getMiniCatalogCluster() {
return this.catalogServer;
}
public CatalogService getCatalogService() {
return new LocalCatalogWrapper(catalogServer);
}
public boolean isHiveCatalogStoreRunning() {
return isHiveCatalogStoreUse;
}
////////////////////////////////////////////////////////
// Tajo Cluster Section
////////////////////////////////////////////////////////
private void startMiniTajoCluster(File testBuildDir,
final int numSlaves,
boolean local) throws Exception {
TajoConf c = getConfiguration();
c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir");
c.setVar(ConfVars.REST_SERVICE_ADDRESS, "localhost:0");
if (!local) {
String tajoRootDir = getMiniDFSCluster().getFileSystem().getUri().toString() + "/tajo";
c.setVar(ConfVars.ROOT_DIR, tajoRootDir);
URI defaultTsUri = TajoConf.getWarehouseDir(c).toUri();
FileTablespace defaultTableSpace =
new FileTablespace(TablespaceManager.DEFAULT_TABLESPACE_NAME, defaultTsUri, null);
defaultTableSpace.init(conf);
TablespaceManager.addTableSpaceForTest(defaultTableSpace);
} else {
c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
}
setupCatalogForTesting(c, testBuildDir);
LOG.info("derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI));
tajoMaster = new TajoMaster();
tajoMaster.init(c);
tajoMaster.start();
this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS));
this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, NetUtils.getHostPortString(tajoMasterAddress));
this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
InetSocketAddress tajoRestAddress = tajoMaster.getContext().getRestServer().getBindAddress();
this.conf.setVar(ConfVars.REST_SERVICE_ADDRESS, NetUtils.getHostPortString(tajoRestAddress));
startTajoWorkers(numSlaves);
isTajoClusterRunning = true;
LOG.info("Mini Tajo cluster is up");
LOG.info("====================================================================================");
LOG.info("= MiniTajoCluster starts up =");
LOG.info("====================================================================================");
LOG.info("= * Master Address: " + tajoMaster.getMasterName());
LOG.info("= * CatalogStore: " + tajoMaster.getCatalogServer().getStoreClassName());
LOG.info("------------------------------------------------------------------------------------");
LOG.info("= * Warehouse Dir: " + TajoConf.getWarehouseDir(c));
LOG.info("= * Worker Tmp Dir: " + c.getVar(ConfVars.WORKER_TEMPORAL_DIR));
LOG.info("====================================================================================");
}
private void setupCatalogForTesting(TajoConf c, File testBuildDir) throws IOException, UnsupportedCatalogStore {
final String HIVE_CATALOG_CLASS_NAME = "org.apache.tajo.catalog.store.HiveCatalogStore";
boolean hiveCatalogClassExists = false;
try {
getClass().getClassLoader().loadClass(HIVE_CATALOG_CLASS_NAME);
hiveCatalogClassExists = true;
} catch (ClassNotFoundException e) {
LOG.info("HiveCatalogStore is not available.");
}
String driverClass = System.getProperty(CatalogConstants.STORE_CLASS);
if (hiveCatalogClassExists &&
driverClass != null && driverClass.equals(HIVE_CATALOG_CLASS_NAME)) {
try {
getClass().getClassLoader().loadClass(HIVE_CATALOG_CLASS_NAME);
String jdbcUri = "jdbc:derby:;databaseName="+ testBuildDir.toURI().getPath() + "/metastore_db;create=true";
c.set("hive.metastore.warehouse.dir", TajoConf.getWarehouseDir(c).toString() + "/default");
c.set("javax.jdo.option.ConnectionURL", jdbcUri);
c.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, conf.getVar(ConfVars.WAREHOUSE_DIR));
c.set(CatalogConstants.STORE_CLASS, HIVE_CATALOG_CLASS_NAME);
Path defaultDatabasePath = new Path(TajoConf.getWarehouseDir(c).toString() + "/default");
FileSystem fs = defaultDatabasePath.getFileSystem(c);
if (!fs.exists(defaultDatabasePath)) {
fs.mkdirs(defaultDatabasePath);
}
isHiveCatalogStoreUse = true;
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
} else { // for derby
CatalogTestingUtil.configureCatalog(conf, testBuildDir.getAbsolutePath());
}
c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
}
private void startTajoWorkers(int numSlaves) throws Exception {
for(int i = 0; i < 1; i++) {
TajoWorker tajoWorker = new TajoWorker();
TajoConf workerConf = new TajoConf(this.conf);
workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0");
workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0");
workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0");
tajoWorker.startWorker(workerConf, new String[0]);
LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
tajoWorkers.add(tajoWorker);
}
}
public TajoMaster getMaster() {
return this.tajoMaster;
}
public List<TajoWorker> getTajoWorkers() {
return this.tajoWorkers;
}
public void shutdownMiniTajoCluster() {
if(this.tajoMaster != null) {
this.tajoMaster.stop();
}
for(TajoWorker eachWorker: tajoWorkers) {
eachWorker.stopWorkerForce();
}
tajoWorkers.clear();
this.tajoMaster= null;
}
////////////////////////////////////////////////////////
// Meta Cluster Section
////////////////////////////////////////////////////////
/**
* @throws java.io.IOException If a cluster -- dfs or engine -- already running.
*/
void isRunningCluster() throws IOException {
if (!isTajoClusterRunning && !isCatalogServerRunning && !isDFSRunning) return;
throw new IOException("Cluster already running at " +
this.clusterTestBuildDir);
}
/**
* This method starts up a tajo cluster with a given number of clusters in
* distributed mode.
*
* @param numSlaves the number of tajo cluster to start up
* @throws Exception
*/
public void startMiniCluster(final int numSlaves)
throws Exception {
startMiniCluster(numSlaves, null);
}
public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception {
int numDataNodes = numSlaves;
if (dataNodeHosts != null && dataNodeHosts.length != 0) {
numDataNodes = dataNodeHosts.length;
}
LOG.info("Starting up minicluster with 1 master(s) and " +
numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");
// If we already bring up the cluster, fail.
isRunningCluster();
if (clusterTestBuildDir != null) {
LOG.info("Using passed path: " + clusterTestBuildDir);
}
startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts);
this.dfsCluster.waitClusterUp();
hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
}
public void startMiniClusterInLocal(final int numSlaves) throws Exception {
isRunningCluster();
if (clusterTestBuildDir != null) {
LOG.info("Using passed path: " + clusterTestBuildDir);
}
startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
}
public void shutdownMiniCluster() throws IOException {
LOG.info("========================================");
LOG.info("Minicluster is stopping");
LOG.info("========================================");
shutdownMiniTajoCluster();
if(this.catalogServer != null) {
shutdownCatalogCluster();
isCatalogServerRunning = false;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(this.dfsCluster != null) {
try {
FileSystem fs = this.dfsCluster.getFileSystem();
if (fs != null) fs.close();
this.dfsCluster.shutdown();
} catch (IOException e) {
System.err.println("error closing file system: " + e);
}
isDFSRunning = false;
}
if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
if(!ShutdownHookManager.get().isShutdownInProgress()) {
//TODO clean test dir when ShutdownInProgress
LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
localFS.delete(new Path(clusterTestBuildDir.toString()), true);
localFS.close();
}
this.clusterTestBuildDir = null;
}
if(hbaseUtil != null) {
hbaseUtil.stopZooKeeperCluster();
hbaseUtil.stopHBaseCluster();
}
LOG.info("Minicluster is down");
isTajoClusterRunning = false;
}
public TajoClient newTajoClient() throws Exception {
return new TajoClientImpl(ServiceTrackerFactory.get(getConfiguration()));
}
public static ResultSet run(String[] names,
Schema[] schemas,
String[][] tables,
String query,
TajoClient client) throws Exception {
TajoTestingCluster util = TpchTestBase.getInstance().getTestingCluster();
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = TajoConf.getWarehouseDir(util.getConfiguration());
fs.mkdirs(rootDir);
for (int i = 0; i < names.length; i++) {
createTable(util.conf, names[i], schemas[i], tables[i]);
}
ResultSet res = client.executeQueryAndGetResult(query);
return res;
}
public static ResultSet run(String[] names,
Schema[] schemas,
String[][] tables,
String query) throws Exception {
TpchTestBase instance = TpchTestBase.getInstance();
TajoTestingCluster util = instance.getTestingCluster();
while(true) {
if(util.getMaster().isMasterRunning()) {
break;
}
Thread.sleep(1000);
}
TajoConf conf = util.getConfiguration();
try (TajoClient client = new TajoClientImpl(ServiceTrackerFactory.get(conf))) {
return run(names, schemas, tables, query, client);
}
}
public static TajoClient newTajoClient(TajoTestingCluster util) throws SQLException, InterruptedException {
while(true) {
if(util.getMaster().isMasterRunning()) {
break;
}
Thread.sleep(1000);
}
TajoConf conf = util.getConfiguration();
return new TajoClientImpl(ServiceTrackerFactory.get(conf));
}
public static void createTable(TajoConf conf, String tableName, Schema schema,
String[] tableDatas) throws Exception {
createTable(conf, tableName, schema, tableDatas, 1);
}
public static void createTable(TajoConf conf, String tableName, Schema schema,
String[] tableDatas, int numDataFiles) throws Exception {
TpchTestBase instance = TpchTestBase.getInstance();
TajoTestingCluster util = instance.getTestingCluster();
try (TajoClient client = newTajoClient(util)) {
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = TajoConf.getWarehouseDir(util.getConfiguration());
if (!fs.exists(rootDir)) {
fs.mkdirs(rootDir);
}
Path tablePath;
if (CatalogUtil.isFQTableName(tableName)) {
Pair<String, String> name = CatalogUtil.separateQualifierAndName(tableName);
tablePath = new Path(rootDir, new Path(name.getFirst(), name.getSecond()));
} else {
tablePath = new Path(rootDir, tableName);
}
fs.mkdirs(tablePath);
if (tableDatas.length > 0) {
int recordPerFile = tableDatas.length / numDataFiles;
if (recordPerFile == 0) {
recordPerFile = 1;
}
FSDataOutputStream out = null;
for (int j = 0; j < tableDatas.length; j++) {
if (out == null || j % recordPerFile == 0) {
if (out != null) {
out.close();
}
Path dfsPath = new Path(tablePath, tableName + j + ".tbl");
out = fs.create(dfsPath);
}
out.write((tableDatas[j] + "\n").getBytes());
}
if (out != null) {
out.close();
}
}
TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, conf);
client.createExternalTable(tableName, schema, tablePath.toUri(), meta);
}
}
/**
* Write lines to a file.
*
* @param file File to write lines to
* @param lines Strings written to the file
* @throws java.io.IOException
*/
private static void writeLines(File file, String... lines)
throws IOException {
Writer writer = Files.newWriter(file, Charsets.UTF_8);
try {
for (String line : lines) {
writer.write(line);
writer.write('\n');
}
} finally {
Closeables.closeQuietly(writer);
}
}
public void setAllTajoDaemonConfValue(String key, String value) {
tajoMaster.getContext().getConf().set(key, value);
setAllWorkersConfValue(key, value);
}
public void setAllWorkersConfValue(String key, String value) {
for (TajoWorker eachWorker: tajoWorkers) {
eachWorker.getConfig().set(key, value);
}
}
public void waitForQuerySubmitted(QueryId queryId) throws Exception {
waitForQuerySubmitted(queryId, 50);
}
public void waitForQuerySubmitted(QueryId queryId, int delay) throws Exception {
QueryMasterTask qmt = null;
int i = 0;
while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
try {
Thread.sleep(delay);
if (qmt == null) {
qmt = getQueryMasterTask(queryId);
}
} catch (InterruptedException e) {
}
if (++i > 200) {
throw new IOException("Timed out waiting for query to start");
}
}
}
public void waitForQueryState(Query query, TajoProtos.QueryState expected, int delay) throws Exception {
int i = 0;
while (query == null || query.getSynchronizedState() != expected) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
if (++i > 200) {
throw new IOException("Timed out waiting. expected: " + expected +
", actual: " + query != null ? String.valueOf(query.getSynchronizedState()) : String.valueOf(query));
}
}
}
public void waitForStageState(Stage stage, StageState expected, int delay) throws Exception {
int i = 0;
while (stage == null || stage.getSynchronizedState() != expected) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
if (++i > 200) {
throw new IOException("Timed out waiting");
}
}
}
public QueryMasterTask getQueryMasterTask(QueryId queryId) {
QueryMasterTask qmt = null;
for (TajoWorker worker : getTajoWorkers()) {
qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true);
if (qmt != null && queryId.equals(qmt.getQueryId())) {
break;
}
}
return qmt;
}
public QueryHistory getQueryHistory(QueryId queryId) throws IOException {
QueryHistory queryHistory = null;
for (TajoWorker worker : getTajoWorkers()) {
queryHistory = worker.getWorkerContext().getQueryMaster().getQueryHistory(queryId);
if (queryHistory != null) {
break;
}
}
return queryHistory;
}
}