blob: dc0087b9a68eb1b563000a718bd73184a9fc35cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.api.common;
import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
import java.io.File;
import java.net.Inet4Address;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.config.AsterixPropertiesAccessor;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
public class AsterixHyracksIntegrationUtil {
static class LoggerHolder {
static final Logger LOGGER = Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName());
private LoggerHolder() {
}
}
private static final String IO_DIR_KEY = "java.io.tmpdir";
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
public ClusterControllerService cc;
public NodeControllerService[] ncs;
public IHyracksClientConnection hcc;
private AsterixPropertiesAccessor propertiesAccessor;
public void init(boolean deleteOldInstanceData) throws Exception {
ncs = new NodeControllerService[0]; // ensure that ncs is not null
propertiesAccessor = new AsterixPropertiesAccessor();
if (deleteOldInstanceData) {
deleteTransactionLogs();
removeTestStorageFiles();
}
cc = new ClusterControllerService(createCCConfig());
cc.start();
// Starts ncs.
List<String> nodes = propertiesAccessor.getNodeNames();
List<NodeControllerService> nodeControllers = new ArrayList<>();
for (String ncName : nodes) {
NodeControllerService nodeControllerService = new NodeControllerService(createNCConfig(ncName));
nodeControllers.add(nodeControllerService);
Thread ncStartThread = new Thread("IntegrationUtil-" + ncName) {
@Override
public void run() {
try {
nodeControllerService.start();
} catch (Exception e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
}
}
};
ncStartThread.start();
ncStartThread.join();
}
hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
}
protected CCConfig createCCConfig() {
CCConfig ccConfig = new CCConfig();
ccConfig.clusterNetIpAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ccConfig.clientNetIpAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.resultTTL = 30000;
ccConfig.resultSweepThreshold = 1000;
ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
return ccConfig;
}
protected NCConfig createNCConfig(String ncName) throws AsterixException {
NCConfig ncConfig = new NCConfig();
ncConfig.ccHost = "localhost";
ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
ncConfig.clusterNetIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ncConfig.dataIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ncConfig.resultIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ncConfig.messagingIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
ncConfig.nodeId = ncName;
ncConfig.resultTTL = 30000;
ncConfig.resultSweepThreshold = 1000;
ncConfig.appArgs = Collections.singletonList("-virtual-NC");
String tempPath = System.getProperty(IO_DIR_KEY);
if (tempPath.endsWith(File.separator)) {
tempPath = tempPath.substring(0, tempPath.length() - 1);
}
System.err.println("Using the path: " + tempPath);
// get initial partitions from properties
String[] nodeStores = propertiesAccessor.getStores().get(ncName);
if (nodeStores == null) {
throw new AsterixException("Coudn't find stores for NC: " + ncName);
}
String tempDirPath = System.getProperty(IO_DIR_KEY);
if (!tempDirPath.endsWith(File.separator)) {
tempDirPath += File.separator;
}
for (int p = 0; p < nodeStores.length; p++) {
// create IO devices based on stores
String iodevicePath = tempDirPath + ncConfig.nodeId + File.separator + nodeStores[p];
File ioDeviceDir = new File(iodevicePath);
ioDeviceDir.mkdirs();
if (p == 0) {
ncConfig.ioDevices = iodevicePath;
} else {
ncConfig.ioDevices += "," + iodevicePath;
}
}
ncConfig.appNCMainClass = NCApplicationEntryPoint.class.getName();
return ncConfig;
}
public String[] getNcNames() {
return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
}
public IHyracksClientConnection getHyracksClientConnection() {
return hcc;
}
public void deinit(boolean deleteOldInstanceData) throws Exception {
//stop NCs
ArrayList<Thread> stopNCThreads = new ArrayList<>();
for (NodeControllerService nodeControllerService : ncs) {
if (nodeControllerService != null) {
Thread ncStopThread = new Thread() {
@Override
public void run() {
try {
nodeControllerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
};
stopNCThreads.add(ncStopThread);
ncStopThread.start();
}
}
//make sure all NCs stopped
for (Thread stopNcTheard : stopNCThreads) {
stopNcTheard.join();
}
if (cc != null) {
cc.stop();
}
if (deleteOldInstanceData) {
deleteTransactionLogs();
removeTestStorageFiles();
}
}
public void runJob(JobSpecification spec) throws Exception {
GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
hcc.waitForCompletion(jobId);
}
public void removeTestStorageFiles() {
File dir = new File(System.getProperty(IO_DIR_KEY));
for (String ncName : propertiesAccessor.getNodeNames()) {
File ncDir = new File(dir, ncName);
FileUtils.deleteQuietly(ncDir);
}
}
private void deleteTransactionLogs() throws Exception {
for (String ncId : propertiesAccessor.getNodeNames()) {
File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
if (log.exists()) {
FileUtils.deleteDirectory(log);
}
}
}
/**
* main method to run a simple 2 node cluster in-process
* suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
*
* @param args
* unused
*/
public static void main(String[] args) {
AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
run(integrationUtil, Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"));
}
protected static void run(final AsterixHyracksIntegrationUtil integrationUtil, boolean cleanupOnStart,
boolean cleanupOnShutdown) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
integrationUtil.deinit(cleanupOnShutdown);
} catch (Exception e) {
e.printStackTrace();
}
}
});
try {
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
integrationUtil.init(cleanupOnStart);
while (true) {
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}