| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hcatalog.hbase; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.MiniHBaseCluster; |
| import org.apache.hadoop.hbase.client.HConnectionManager; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.MiniMRCluster; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.ServerSocket; |
| |
| /** |
| * MiniCluster class composed of a number of Hadoop Minicluster implementations |
| * and other necessary daemons needed for testing (HBase, Hive MetaStore, Zookeeper, MiniMRCluster) |
| */ |
| public class ManyMiniCluster { |
| |
| //MR stuff |
| private boolean miniMRClusterEnabled; |
| private MiniMRCluster mrCluster; |
| private int numTaskTrackers; |
| private JobConf jobConf; |
| |
| //HBase stuff |
| private boolean miniHBaseClusterEnabled; |
| private MiniHBaseCluster hbaseCluster; |
| private String hbaseRoot; |
| private Configuration hbaseConf; |
| private String hbaseDir; |
| |
| //ZK Stuff |
| private boolean miniZookeeperClusterEnabled; |
| private MiniZooKeeperCluster zookeeperCluster; |
| private int zookeeperPort; |
| private String zookeeperDir; |
| |
| //DFS Stuff |
| private MiniDFSCluster dfsCluster; |
| |
| //Hive Stuff |
| private boolean miniHiveMetastoreEnabled; |
| private HiveConf hiveConf; |
| private HiveMetaStoreClient hiveMetaStoreClient; |
| |
| private final File workDir; |
| private boolean started = false; |
| |
| |
| /** |
| * create a cluster instance using a builder which will expose configurable options |
| * @param workDir working directory ManyMiniCluster will use for all of it's *Minicluster instances |
| * @return a Builder instance |
| */ |
| public static Builder create(File workDir) { |
| return new Builder(workDir); |
| } |
| |
| private ManyMiniCluster(Builder b) { |
| workDir = b.workDir; |
| numTaskTrackers = b.numTaskTrackers; |
| hiveConf = b.hiveConf; |
| jobConf = b.jobConf; |
| hbaseConf = b.hbaseConf; |
| miniMRClusterEnabled = b.miniMRClusterEnabled; |
| miniHBaseClusterEnabled = b.miniHBaseClusterEnabled; |
| miniHiveMetastoreEnabled = b.miniHiveMetastoreEnabled; |
| miniZookeeperClusterEnabled = b.miniZookeeperClusterEnabled; |
| } |
| |
| protected synchronized void start() { |
| try { |
| if (!started) { |
| FileUtil.fullyDelete(workDir); |
| if (miniMRClusterEnabled) { |
| setupMRCluster(); |
| } |
| if (miniZookeeperClusterEnabled || miniHBaseClusterEnabled) { |
| miniZookeeperClusterEnabled = true; |
| setupZookeeper(); |
| } |
| if (miniHBaseClusterEnabled) { |
| setupHBaseCluster(); |
| } |
| if (miniHiveMetastoreEnabled) { |
| setUpMetastore(); |
| } |
| } |
| } catch (Exception e) { |
| throw new IllegalStateException("Failed to setup cluster", e); |
| } |
| } |
| |
| protected synchronized void stop() { |
| if (hbaseCluster != null) { |
| HConnectionManager.deleteAllConnections(true); |
| try { |
| hbaseCluster.shutdown(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| hbaseCluster = null; |
| } |
| if (zookeeperCluster != null) { |
| try { |
| zookeeperCluster.shutdown(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| zookeeperCluster = null; |
| } |
| if (mrCluster != null) { |
| try { |
| mrCluster.shutdown(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| mrCluster = null; |
| } |
| if (dfsCluster != null) { |
| try { |
| dfsCluster.getFileSystem().close(); |
| dfsCluster.shutdown(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| dfsCluster = null; |
| } |
| try { |
| FileSystem.closeAll(); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| started = false; |
| } |
| |
| /** |
| * @return Configuration of mini HBase cluster |
| */ |
| public Configuration getHBaseConf() { |
| return HBaseConfiguration.create(hbaseConf); |
| } |
| |
| /** |
| * @return Configuration of mini MR cluster |
| */ |
| public Configuration getJobConf() { |
| return new Configuration(jobConf); |
| } |
| |
| /** |
| * @return Configuration of Hive Metastore, this is a standalone not a daemon |
| */ |
| public HiveConf getHiveConf() { |
| return new HiveConf(hiveConf); |
| } |
| |
| /** |
| * @return Filesystem used by MiniMRCluster and MiniHBaseCluster |
| */ |
| public FileSystem getFileSystem() { |
| try { |
| return FileSystem.get(jobConf); |
| } catch (IOException e) { |
| throw new IllegalStateException("Failed to get FileSystem", e); |
| } |
| } |
| |
| /** |
| * @return Metastore client instance |
| */ |
| public HiveMetaStoreClient getHiveMetaStoreClient() { |
| return hiveMetaStoreClient; |
| } |
| |
| private void setupMRCluster() { |
| try { |
| final int jobTrackerPort = findFreePort(); |
| final int taskTrackerPort = findFreePort(); |
| |
| if (jobConf == null) |
| jobConf = new JobConf(); |
| |
| jobConf.setInt("mapred.submit.replication", 1); |
| //conf.set("hadoop.job.history.location",new File(workDir).getAbsolutePath()+"/history"); |
| System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath()); |
| |
| mrCluster = new MiniMRCluster(jobTrackerPort, |
| taskTrackerPort, |
| numTaskTrackers, |
| getFileSystem().getUri().toString(), |
| numTaskTrackers, |
| null, |
| null, |
| null, |
| jobConf); |
| |
| jobConf = mrCluster.createJobConf(); |
| } catch (IOException e) { |
| throw new IllegalStateException("Failed to Setup MR Cluster", e); |
| } |
| } |
| |
| private void setupZookeeper() { |
| try { |
| zookeeperDir = new File(workDir, "zk").getAbsolutePath(); |
| zookeeperPort = findFreePort(); |
| zookeeperCluster = new MiniZooKeeperCluster(); |
| zookeeperCluster.setDefaultClientPort(zookeeperPort); |
| zookeeperCluster.startup(new File(zookeeperDir)); |
| } catch (Exception e) { |
| throw new IllegalStateException("Failed to Setup Zookeeper Cluster", e); |
| } |
| } |
| |
| private void setupHBaseCluster() { |
| final int numRegionServers = 1; |
| |
| try { |
| hbaseDir = new File(workDir, "hbase").getAbsolutePath(); |
| hbaseRoot = "file://" + hbaseDir; |
| |
| if (hbaseConf == null) |
| hbaseConf = HBaseConfiguration.create(); |
| |
| hbaseConf.set("hbase.rootdir", hbaseRoot); |
| hbaseConf.set("hbase.master", "local"); |
| hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort); |
| hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1"); |
| hbaseConf.setInt("hbase.master.port", findFreePort()); |
| hbaseConf.setInt("hbase.master.info.port", -1); |
| hbaseConf.setInt("hbase.regionserver.port", findFreePort()); |
| hbaseConf.setInt("hbase.regionserver.info.port", -1); |
| |
| hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers); |
| hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort()); |
| //opening the META table ensures that cluster is running |
| new HTable(hbaseConf, HConstants.META_TABLE_NAME); |
| } catch (Exception e) { |
| throw new IllegalStateException("Failed to setup HBase Cluster", e); |
| } |
| } |
| |
| private void setUpMetastore() throws Exception { |
| if (hiveConf == null) |
| hiveConf = new HiveConf(this.getClass()); |
| |
| //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook |
| //is present only in the ql/test directory |
| hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); |
| hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); |
| hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); |
| hiveConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, |
| "jdbc:derby:" + new File(workDir + "/metastore_db") + ";create=true"); |
| hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), |
| new File(workDir, "warehouse").toString()); |
| //set where derby logs |
| File derbyLogFile = new File(workDir + "/derby.log"); |
| derbyLogFile.createNewFile(); |
| System.setProperty("derby.stream.error.file", derbyLogFile.getPath()); |
| |
| |
| // Driver driver = new Driver(hiveConf); |
| // SessionState.start(new CliSessionState(hiveConf)); |
| |
| hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); |
| } |
| |
| private static int findFreePort() throws IOException { |
| ServerSocket server = new ServerSocket(0); |
| int port = server.getLocalPort(); |
| server.close(); |
| return port; |
| } |
| |
| public static class Builder { |
| private File workDir; |
| private int numTaskTrackers = 1; |
| private JobConf jobConf; |
| private Configuration hbaseConf; |
| private HiveConf hiveConf; |
| |
| private boolean miniMRClusterEnabled = true; |
| private boolean miniHBaseClusterEnabled = true; |
| private boolean miniHiveMetastoreEnabled = true; |
| private boolean miniZookeeperClusterEnabled = true; |
| |
| |
| private Builder(File workDir) { |
| this.workDir = workDir; |
| } |
| |
| public Builder numTaskTrackers(int num) { |
| numTaskTrackers = num; |
| return this; |
| } |
| |
| public Builder jobConf(JobConf jobConf) { |
| this.jobConf = jobConf; |
| return this; |
| } |
| |
| public Builder hbaseConf(Configuration hbaseConf) { |
| this.hbaseConf = hbaseConf; |
| return this; |
| } |
| |
| public Builder hiveConf(HiveConf hiveConf) { |
| this.hiveConf = hiveConf; |
| return this; |
| } |
| |
| public Builder miniMRClusterEnabled(boolean enabled) { |
| this.miniMRClusterEnabled = enabled; |
| return this; |
| } |
| |
| public Builder miniHBaseClusterEnabled(boolean enabled) { |
| this.miniHBaseClusterEnabled = enabled; |
| return this; |
| } |
| |
| public Builder miniZookeeperClusterEnabled(boolean enabled) { |
| this.miniZookeeperClusterEnabled = enabled; |
| return this; |
| } |
| |
| public Builder miniHiveMetastoreEnabled(boolean enabled) { |
| this.miniHiveMetastoreEnabled = enabled; |
| return this; |
| } |
| |
| |
| public ManyMiniCluster build() { |
| return new ManyMiniCluster(this); |
| } |
| |
| } |
| } |