| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.base.Preconditions; |
| import com.google.common.io.Closeables; |
| import com.google.common.io.Files; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.*; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.util.ShutdownHookManager; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.tajo.catalog.*; |
| import org.apache.tajo.catalog.Options; |
| import org.apache.tajo.catalog.proto.CatalogProtos; |
| import org.apache.tajo.client.TajoClient; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.conf.TajoConf.ConfVars; |
| import org.apache.tajo.master.TajoMaster; |
| import org.apache.tajo.master.rm.TajoWorkerResourceManager; |
| import org.apache.tajo.master.rm.YarnTajoResourceManager; |
| import org.apache.tajo.util.CommonTestingUtil; |
| import org.apache.tajo.util.NetUtils; |
| import org.apache.tajo.worker.TajoWorker; |
| |
| import java.io.*; |
| import java.net.InetSocketAddress; |
| import java.net.URL; |
| import java.sql.ResultSet; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.UUID; |
| |
| public class TajoTestingCluster { |
| private static Log LOG = LogFactory.getLog(TajoTestingCluster.class); |
| private TajoConf conf; |
| |
| protected MiniTajoYarnCluster yarnCluster; |
| private FileSystem defaultFS; |
| private MiniDFSCluster dfsCluster; |
| private MiniCatalogServer catalogServer; |
| |
| private TajoMaster tajoMaster; |
| private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>(); |
| private boolean standbyWorkerMode = false; |
| |
| // If non-null, then already a cluster running. |
| private File clusterTestBuildDir = null; |
| |
| /** |
| * System property key to get test directory value. |
| * Name is as it is because mini dfs has hard-codings to put test data here. |
| */ |
| public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA; |
| |
| /** |
| * Default parent directory for test output. |
| */ |
| public static final String DEFAULT_TEST_DIRECTORY = "target/test-data"; |
| |
| public TajoTestingCluster() { |
| this.conf = new TajoConf(); |
| initPropertiesAndConfigs(); |
| } |
| |
| void initPropertiesAndConfigs() { |
| if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { |
| String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname); |
| Preconditions.checkState( |
| testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) || |
| testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()), |
| ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " + |
| YarnTajoResourceManager.class.getCanonicalName() +"." |
| ); |
| conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname)); |
| } |
| conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024); |
| conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f); |
| |
| this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS) |
| .indexOf(TajoWorkerResourceManager.class.getName()) >= 0; |
| conf.set(CommonTestingUtil.TAJO_TEST, "TRUE"); |
| } |
| |
| public TajoConf getConfiguration() { |
| return this.conf; |
| } |
| |
| public void initTestDir() { |
| if (System.getProperty(TEST_DIRECTORY_KEY) == null) { |
| clusterTestBuildDir = setupClusterTestBuildDir(); |
| System.setProperty(TEST_DIRECTORY_KEY, |
| clusterTestBuildDir.getAbsolutePath()); |
| } |
| } |
| |
| /** |
| * @return Where to write test data on local filesystem; usually |
| * {@link #DEFAULT_TEST_DIRECTORY} |
| * @see #setupClusterTestBuildDir() |
| */ |
| public static File getTestDir() { |
| return new File(System.getProperty(TEST_DIRECTORY_KEY, |
| DEFAULT_TEST_DIRECTORY)); |
| } |
| |
| /** |
| * @param subdirName |
| * @return Path to a subdirectory named <code>subdirName</code> under |
| * {@link #getTestDir()}. |
| * @see #setupClusterTestBuildDir() |
| */ |
| public static File getTestDir(final String subdirName) { |
| return new File(getTestDir(), subdirName); |
| } |
| |
| public File setupClusterTestBuildDir() { |
| String randomStr = UUID.randomUUID().toString(); |
| String dirStr = getTestDir(randomStr).toString(); |
| File dir = new File(dirStr).getAbsoluteFile(); |
| // Have it cleaned up on exit |
| dir.deleteOnExit(); |
| return dir; |
| } |
| |
| //////////////////////////////////////////////////////// |
| // HDFS Section |
| //////////////////////////////////////////////////////// |
| /** |
| * Start a minidfscluster. |
| * @param servers How many DNs to start. |
| * @throws Exception |
| * @see {@link #shutdownMiniDFSCluster()} |
| * @return The mini dfs cluster created. |
| */ |
| public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { |
| return startMiniDFSCluster(servers, null, null); |
| } |
| |
| /** |
| * Start a minidfscluster. |
| * Can only create one. |
| * @param servers How many DNs to start. |
| * @param dir Where to home your dfs cluster. |
| * @param hosts hostnames DNs to run on. |
| * @throws Exception |
| * @see {@link #shutdownMiniDFSCluster()} |
| * @return The mini dfs cluster created. |
| * @throws java.io.IOException |
| */ |
| public MiniDFSCluster startMiniDFSCluster(int servers, |
| final File dir, |
| final String hosts[]) |
| throws IOException { |
| if (dir == null) { |
| this.clusterTestBuildDir = setupClusterTestBuildDir(); |
| } else { |
| this.clusterTestBuildDir = dir; |
| } |
| |
| System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, |
| this.clusterTestBuildDir.toString()); |
| |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); |
| conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); |
| MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf)); |
| builder.hosts(hosts); |
| builder.numDataNodes(servers); |
| builder.format(true); |
| builder.manageNameDfsDirs(true); |
| builder.manageDataDfsDirs(true); |
| builder.waitSafeMode(true); |
| this.dfsCluster = builder.build(); |
| |
| // Set this just-started cluser as our filesystem. |
| this.defaultFS = this.dfsCluster.getFileSystem(); |
| this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString()); |
| this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo"); |
| |
| return this.dfsCluster; |
| } |
| |
| public void shutdownMiniDFSCluster() throws Exception { |
| if (this.dfsCluster != null) { |
| try { |
| FileSystem fs = this.dfsCluster.getFileSystem(); |
| if (fs != null) fs.close(); |
| } catch (IOException e) { |
| System.err.println("error closing file system: " + e); |
| } |
| // The below throws an exception per dn, AsynchronousCloseException. |
| this.dfsCluster.shutdown(); |
| } |
| } |
| |
| public boolean isRunningDFSCluster() { |
| return this.defaultFS != null; |
| } |
| |
| public MiniDFSCluster getMiniDFSCluster() { |
| return this.dfsCluster; |
| } |
| |
| public FileSystem getDefaultFileSystem() { |
| return this.defaultFS; |
| } |
| |
| //////////////////////////////////////////////////////// |
| // Catalog Section |
| //////////////////////////////////////////////////////// |
| public MiniCatalogServer startCatalogCluster() throws Exception { |
| TajoConf c = getConfiguration(); |
| |
| if(clusterTestBuildDir == null) { |
| clusterTestBuildDir = setupClusterTestBuildDir(); |
| } |
| |
| conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore"); |
| conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db"); |
| LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI)); |
| conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0"); |
| |
| catalogServer = new MiniCatalogServer(conf); |
| CatalogServer catServer = catalogServer.getCatalogServer(); |
| InetSocketAddress sockAddr = catServer.getBindAddress(); |
| c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr)); |
| |
| return this.catalogServer; |
| } |
| |
| public void shutdownCatalogCluster() { |
| if (catalogServer != null) { |
| this.catalogServer.shutdown(); |
| } |
| } |
| |
| public MiniCatalogServer getMiniCatalogCluster() { |
| return this.catalogServer; |
| } |
| |
| //////////////////////////////////////////////////////// |
| // Tajo Cluster Section |
| //////////////////////////////////////////////////////// |
| private void startMiniTajoCluster(File testBuildDir, |
| final int numSlaves, |
| boolean local) throws Exception { |
| TajoConf c = getConfiguration(); |
| c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0"); |
| c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0"); |
| c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0"); |
| c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0"); |
| c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0"); |
| c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore"); |
| c.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db"); |
| c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir"); |
| |
| LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI)); |
| |
| if (!local) { |
| c.setVar(ConfVars.ROOT_DIR, |
| getMiniDFSCluster().getFileSystem().getUri() + "/tajo"); |
| } else { |
| c.setVar(ConfVars.ROOT_DIR, |
| clusterTestBuildDir.getAbsolutePath() + "/tajo"); |
| } |
| |
| tajoMaster = new TajoMaster(); |
| tajoMaster.init(c); |
| tajoMaster.start(); |
| |
| this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS)); |
| this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)); |
| |
| InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress(); |
| |
| this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, |
| tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort()); |
| this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS)); |
| this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS)); |
| |
| if(standbyWorkerMode) { |
| startTajoWorkers(numSlaves); |
| } |
| LOG.info("Mini Tajo cluster is up"); |
| } |
| |
| private void startTajoWorkers(int numSlaves) throws Exception { |
| for(int i = 0; i < 1; i++) { |
| TajoWorker tajoWorker = new TajoWorker(); |
| |
| TajoConf workerConf = new TajoConf(this.conf); |
| |
| workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0"); |
| workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0"); |
| workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0"); |
| |
| workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0"); |
| |
| tajoWorker.startWorker(workerConf, new String[]{"standby"}); |
| |
| LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started."); |
| tajoWorkers.add(tajoWorker); |
| } |
| } |
| |
| public void restartTajoCluster(int numSlaves) throws Exception { |
| tajoMaster.stop(); |
| tajoMaster.start(); |
| |
| LOG.info("Minicluster has been restarted"); |
| } |
| |
| public TajoMaster getMaster() { |
| return this.tajoMaster; |
| } |
| |
| public List<TajoWorker> getTajoWorkers() { |
| return this.tajoWorkers; |
| } |
| |
| public void shutdownMiniTajoCluster() { |
| if(this.tajoMaster != null) { |
| this.tajoMaster.stop(); |
| } |
| for(TajoWorker eachWorker: tajoWorkers) { |
| eachWorker.stopWorkerForce(); |
| } |
| tajoWorkers.clear(); |
| this.tajoMaster= null; |
| } |
| |
| //////////////////////////////////////////////////////// |
| // Meta Cluster Section |
| //////////////////////////////////////////////////////// |
| /** |
| * @throws java.io.IOException If a cluster -- dfs or engine -- already running. |
| */ |
| void isRunningCluster(String passedBuildPath) throws IOException { |
| if (this.clusterTestBuildDir == null || passedBuildPath != null) return; |
| throw new IOException("Cluster already running at " + |
| this.clusterTestBuildDir); |
| } |
| |
| /** |
| * This method starts up a tajo cluster with a given number of clusters in |
| * distributed mode. |
| * |
| * @param numSlaves the number of tajo cluster to start up |
| * @throws Exception |
| */ |
| public void startMiniCluster(final int numSlaves) |
| throws Exception { |
| startMiniCluster(numSlaves, null); |
| } |
| |
| public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception { |
| |
| int numDataNodes = numSlaves; |
| if(dataNodeHosts != null && dataNodeHosts.length != 0) { |
| numDataNodes = dataNodeHosts.length; |
| } |
| |
| LOG.info("Starting up minicluster with 1 master(s) and " + |
| numSlaves + " worker(s) and " + numDataNodes + " datanode(s)"); |
| |
| // If we already put up a cluster, fail. |
| String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null); |
| isRunningCluster(testBuildPath); |
| if (testBuildPath != null) { |
| LOG.info("Using passed path: " + testBuildPath); |
| } |
| |
| // Make a new random dir to home everything in. Set it as system property. |
| // minidfs reads home from system property. |
| this.clusterTestBuildDir = testBuildPath == null? |
| setupClusterTestBuildDir() : new File(testBuildPath); |
| |
| System.setProperty(TEST_DIRECTORY_KEY, |
| this.clusterTestBuildDir.getAbsolutePath()); |
| |
| startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts); |
| this.dfsCluster.waitClusterUp(); |
| |
| if(!standbyWorkerMode) { |
| startMiniYarnCluster(); |
| } |
| |
| startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false); |
| } |
| |
| private void startMiniYarnCluster() throws Exception { |
| LOG.info("Starting up YARN cluster"); |
| // Scheduler properties required for YARN to work |
| conf.set("yarn.scheduler.capacity.root.queues", "default"); |
| conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); |
| |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384); |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000); |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1); |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 2); |
| |
| if (yarnCluster == null) { |
| yarnCluster = new MiniTajoYarnCluster(TajoTestingCluster.class.getName(), 3); |
| yarnCluster.init(conf); |
| yarnCluster.start(); |
| |
| ResourceManager resourceManager = yarnCluster.getResourceManager(); |
| InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress(); |
| InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress(); |
| conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr)); |
| conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr)); |
| |
| URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); |
| if (url == null) { |
| throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath"); |
| } |
| yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent()); |
| OutputStream os = new FileOutputStream(new File(url.getPath())); |
| yarnCluster.getConfig().writeXml(os); |
| os.close(); |
| } |
| } |
| |
| public void startMiniClusterInLocal(final int numSlaves) throws Exception { |
| // If we already put up a cluster, fail. |
| String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null); |
| isRunningCluster(testBuildPath); |
| if (testBuildPath != null) { |
| LOG.info("Using passed path: " + testBuildPath); |
| } |
| |
| // Make a new random dir to home everything in. Set it as system property. |
| // minidfs reads home from system property. |
| this.clusterTestBuildDir = testBuildPath == null? |
| setupClusterTestBuildDir() : new File(testBuildPath); |
| |
| System.setProperty(TEST_DIRECTORY_KEY, |
| this.clusterTestBuildDir.getAbsolutePath()); |
| |
| startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true); |
| } |
| |
| public void shutdownMiniCluster() throws IOException { |
| LOG.info("========================================"); |
| LOG.info("Minicluster is stopping"); |
| LOG.info("========================================"); |
| |
| try { |
| Thread.sleep(3000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| |
| shutdownMiniTajoCluster(); |
| |
| if(this.catalogServer != null) { |
| shutdownCatalogCluster(); |
| } |
| |
| if(this.yarnCluster != null) { |
| this.yarnCluster.stop(); |
| } |
| |
| try { |
| Thread.sleep(3000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| |
| if(this.dfsCluster != null) { |
| |
| try { |
| FileSystem fs = this.dfsCluster.getFileSystem(); |
| if (fs != null) fs.close(); |
| this.dfsCluster.shutdown(); |
| } catch (IOException e) { |
| System.err.println("error closing file system: " + e); |
| } |
| } |
| |
| if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) { |
| if(!ShutdownHookManager.get().isShutdownInProgress()) { |
| //TODO clean test dir when ShutdownInProgress |
| LocalFileSystem localFS = LocalFileSystem.getLocal(conf); |
| localFS.delete(new Path(clusterTestBuildDir.toString()), true); |
| localFS.close(); |
| } |
| this.clusterTestBuildDir = null; |
| } |
| LOG.info("Minicluster is down"); |
| } |
| |
| public static ResultSet run(String[] names, |
| Schema[] schemas, |
| Options option, |
| String[][] tables, |
| String query) throws Exception { |
| TpchTestBase instance = TpchTestBase.getInstance(); |
| TajoTestingCluster util = instance.getTestingCluster(); |
| while(true) { |
| if(util.getMaster().isMasterRunning()) { |
| break; |
| } |
| Thread.sleep(1000); |
| } |
| TajoConf conf = util.getConfiguration(); |
| TajoClient client = new TajoClient(conf); |
| |
| FileSystem fs = util.getDefaultFileSystem(); |
| Path rootDir = util.getMaster(). |
| getStorageManager().getWarehouseDir(); |
| fs.mkdirs(rootDir); |
| for (int i = 0; i < names.length; i++) { |
| Path tablePath = new Path(rootDir, names[i]); |
| fs.mkdirs(tablePath); |
| Path dfsPath = new Path(tablePath, names[i] + ".tbl"); |
| FSDataOutputStream out = fs.create(dfsPath); |
| for (int j = 0; j < tables[i].length; j++) { |
| out.write((tables[i][j]+"\n").getBytes()); |
| } |
| out.close(); |
| TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option); |
| client.createExternalTable(names[i], schemas[i], tablePath, meta); |
| } |
| Thread.sleep(1000); |
| ResultSet res = client.executeQueryAndGetResult(query); |
| return res; |
| } |
| |
| /** |
| * Write lines to a file. |
| * |
| * @param file File to write lines to |
| * @param lines Strings written to the file |
| * @throws java.io.IOException |
| */ |
| private static void writeLines(File file, String... lines) |
| throws IOException { |
| Writer writer = Files.newWriter(file, Charsets.UTF_8); |
| try { |
| for (String line : lines) { |
| writer.write(line); |
| writer.write('\n'); |
| } |
| } finally { |
| Closeables.closeQuietly(writer); |
| } |
| } |
| } |