blob: 5e2e64eb93deb324b2748e7cb6fa33dc7320bc81 [file] [log] [blame]
package edu.uci.ics.hivesterix.test.base;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.hivesterix.common.config.ConfUtil;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
@SuppressWarnings("deprecation")
public abstract class AbstractTestSuiteClass extends TestSuite {
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/runtimefunctionts/hadoop/conf";
private static final String PATH_TO_HIVE_CONF = "src/test/resources/runtimefunctionts/hive/conf/hive-default.xml";
private static final String PATH_TO_CLUSTER_CONF = "src/test/resources/runtimefunctionts/hive/conf/topology.xml";
private static final String PATH_TO_DATA = "src/test/resources/runtimefunctionts/data/";
private static final String clusterPropertiesPath = "conf/cluster.properties";
private static final String masterFilePath = "conf/master";
private Properties clusterProps;
private MiniDFSCluster dfsCluster;
private JobConf conf = new JobConf();
protected FileSystem dfs;
private int numberOfNC = 2;
private ClusterControllerService cc;
private Map<String, NodeControllerService> ncs = new HashMap<String, NodeControllerService>();
/**
* setup cluster
*
* @throws IOException
*/
protected void setup() throws Exception {
setupHdfs();
setupHyracks();
}
private void setupHdfs() throws IOException {
conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
HiveConf hconf = new HiveConf(SessionState.class);
hconf.addResource(new Path(PATH_TO_HIVE_CONF));
FileSystem lfs = FileSystem.getLocal(new Configuration());
lfs.delete(new Path("build"), true);
lfs.delete(new Path("metastore_db"), true);
System.setProperty("hadoop.log.dir", "logs");
dfsCluster = new MiniDFSCluster(hconf, numberOfNC, true, null);
dfs = dfsCluster.getFileSystem();
conf = new JobConf(hconf);
ConfUtil.setJobConf(conf);
String fsName = conf.get("fs.default.name");
hconf.set("hive.metastore.warehouse.dir", fsName.concat("/tmp/hivesterix"));
String warehouse = hconf.get("hive.metastore.warehouse.dir");
dfs.mkdirs(new Path(warehouse));
ConfUtil.setHiveConf(hconf);
}
private void setupHyracks() throws Exception {
// read hive conf
HiveConf hconf = new HiveConf(SessionState.class);
hconf.addResource(new Path(PATH_TO_HIVE_CONF));
SessionState.start(hconf);
/**
* load the properties file if it is not loaded
*/
if (clusterProps == null) {
clusterProps = new Properties();
InputStream confIn = new FileInputStream(clusterPropertiesPath);
clusterProps.load(confIn);
confIn.close();
}
BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
String masterNode = ipReader.readLine();
ipReader.close();
InetAddress[] ips = InetAddress.getAllByName(masterNode);
String ipAddress = null;
for (InetAddress ip : ips) {
if (ip.getAddress().length <= 4) {
ipAddress = ip.getHostAddress();
}
}
int clientPort = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
int netPort = Integer.parseInt(clusterProps.getProperty("CC_CLUSTERPORT"));
// start hyracks cc
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = ipAddress;
ccConfig.clientNetPort = clientPort;
ccConfig.clusterNetPort = netPort;
ccConfig.profileDumpPeriod = 1000;
ccConfig.heartbeatPeriod = 200000000;
ccConfig.maxHeartbeatLapsePeriods = 200000000;
ccConfig.clusterTopologyDefinition = new File(PATH_TO_CLUSTER_CONF);
cc = new ClusterControllerService(ccConfig);
cc.start();
// start hyracks nc
for (int i = 0; i < numberOfNC; i++) {
NCConfig ncConfig = new NCConfig();
ncConfig.ccHost = ipAddress;
ncConfig.clusterNetIPAddress = ipAddress;
ncConfig.ccPort = netPort;
ncConfig.dataIPAddress = "127.0.0.1";
ncConfig.datasetIPAddress = "127.0.0.1";
ncConfig.nodeId = "nc" + i;
NodeControllerService nc = new NodeControllerService(ncConfig);
nc.start();
ncs.put(ncConfig.nodeId, nc);
}
}
protected void makeDir(String path) throws IOException {
dfs.mkdirs(new Path(path));
}
protected void loadFiles(String src, String dest) throws IOException {
dfs.copyFromLocalFile(new Path(src), new Path(dest));
}
protected void cleanup() throws Exception {
cleanupHdfs();
cleanupHyracks();
}
/**
* cleanup hdfs cluster
*/
private void cleanupHdfs() throws IOException {
dfs.delete(new Path("/"), true);
FileSystem.closeAll();
dfsCluster.shutdown();
}
/**
* cleanup hyracks cluster
*/
private void cleanupHyracks() throws Exception {
Iterator<NodeControllerService> iterator = ncs.values().iterator();
while (iterator.hasNext()) {
NodeControllerService nc = iterator.next();
nc.stop();
}
cc.stop();
}
protected static List<String> getIgnoreList(String ignorePath) throws FileNotFoundException, IOException {
BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
String s = null;
List<String> ignores = new ArrayList<String>();
while ((s = reader.readLine()) != null) {
ignores.add(s);
}
reader.close();
return ignores;
}
protected static boolean isIgnored(String q, List<String> ignoreList) {
for (String ignore : ignoreList) {
if (q.indexOf(ignore) >= 0) {
return true;
}
}
return false;
}
protected void loadData() throws IOException {
makeDir("/tpch");
makeDir("/tpch/customer");
makeDir("/tpch/lineitem");
makeDir("/tpch/orders");
makeDir("/tpch/part");
makeDir("/tpch/partsupp");
makeDir("/tpch/supplier");
makeDir("/tpch/nation");
makeDir("/tpch/region");
makeDir("/test");
makeDir("/test/joinsrc1");
makeDir("/test/joinsrc2");
loadFiles(PATH_TO_DATA + "customer.tbl", "/tpch/customer/");
loadFiles(PATH_TO_DATA + "lineitem.tbl", "/tpch/lineitem/");
loadFiles(PATH_TO_DATA + "orders.tbl", "/tpch/orders/");
loadFiles(PATH_TO_DATA + "part.tbl", "/tpch/part/");
loadFiles(PATH_TO_DATA + "partsupp.tbl", "/tpch/partsupp/");
loadFiles(PATH_TO_DATA + "supplier.tbl", "/tpch/supplier/");
loadFiles(PATH_TO_DATA + "nation.tbl", "/tpch/nation/");
loadFiles(PATH_TO_DATA + "region.tbl", "/tpch/region/");
loadFiles(PATH_TO_DATA + "large_card_join_src.tbl", "/test/joinsrc1/");
loadFiles(PATH_TO_DATA + "large_card_join_src_small.tbl", "/test/joinsrc2/");
}
}