blob: c2b523612c52485188dfe7ecc604ae4b7d6206e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.api.common;
import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
import static org.apache.hyracks.util.file.FileUtil.joinPath;
import java.io.File;
import java.io.IOException;
import java.net.Inet4Address;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
import org.apache.asterix.app.io.PersistedResourceRegistry;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.PropertiesAccessor;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.hyracks.bootstrap.NCApplication;
import org.apache.asterix.lang.common.util.ExpressionUtils;
import org.apache.asterix.test.dataflow.TestLsmIoOpCallbackFactory;
import org.apache.asterix.test.dataflow.TestPrimaryIndexOperationTrackerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtreeLocalResource;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.kohsuke.args4j.CmdLineException;
@SuppressWarnings({ "squid:ClassVariableVisibilityCheck", "squid:S00112" })
public class AsterixHyracksIntegrationUtil {
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
public static final String RESOURCES_PATH = joinPath(getProjectPath().toString(), "src", "test", "resources");
public static final String DEFAULT_CONF_FILE = joinPath(RESOURCES_PATH, "cc.conf");
private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir");
private static String storagePath = DEFAULT_STORAGE_PATH;
private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(30);
static {
System.setProperty("java.util.logging.manager", org.apache.logging.log4j.jul.LogManager.class.getName());
}
public ClusterControllerService cc;
public NodeControllerService[] ncs = new NodeControllerService[2];
public IHyracksClientConnection hcc;
protected boolean gracefulShutdown = true;
List<Pair<IOption, Object>> opts = new ArrayList<>();
private ConfigManager configManager;
private List<String> nodeNames;
public static void setStoragePath(String path) {
storagePath = path;
}
public static void restoreDefaultStoragePath() {
storagePath = DEFAULT_STORAGE_PATH;
}
/**
* main method to run a simple 2 node cluster in-process
* suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
*
* @param args unused
*/
public static void main(String[] args) throws Exception {
TestUtils.redirectLoggingToConsole();
AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
try {
integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
getConfPath());
} catch (Exception e) {
LOGGER.fatal("Unexpected exception", e);
System.exit(1);
}
}
public void init(boolean deleteOldInstanceData, String confFile) throws Exception { //NOSONAR
configureExternalLibDir();
final ICCApplication ccApplication = createCCApplication();
if (confFile == null) {
configManager = new ConfigManager();
} else {
configManager = new ConfigManager(new String[] { "-config-file", confFile });
}
ccApplication.registerConfig(configManager);
final CCConfig ccConfig = createCCConfig(configManager);
configManager.processConfig();
ccConfig.setKeyStorePath(joinPath(RESOURCES_PATH, ccConfig.getKeyStorePath()));
ccConfig.setTrustStorePath(joinPath(RESOURCES_PATH, ccConfig.getTrustStorePath()));
cc = new ClusterControllerService(ccConfig, ccApplication);
nodeNames = ccConfig.getConfigManager().getNodeNames();
if (deleteOldInstanceData && nodeNames != null) {
deleteTransactionLogs();
removeTestStorageFiles();
deleteCCFiles();
}
final List<NodeControllerService> nodeControllers = new ArrayList<>();
for (String nodeId : ExpressionUtils.emptyIfNull(nodeNames)) {
// mark this NC as virtual, so that the CC doesn't try to start via NCService...
configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED);
final INCApplication ncApplication = createNCApplication();
ConfigManager ncConfigManager;
if (confFile == null) {
ncConfigManager = new ConfigManager();
} else {
ncConfigManager = new ConfigManager(new String[] { "-config-file", confFile });
}
ncApplication.registerConfig(ncConfigManager);
opts.forEach(opt -> ncConfigManager.set(nodeId, opt.getLeft(), opt.getRight()));
nodeControllers
.add(new NodeControllerService(fixupPaths(createNCConfig(nodeId, ncConfigManager)), ncApplication));
}
opts.forEach(opt -> configManager.set(opt.getLeft(), opt.getRight()));
cc.start();
// Starts ncs.
nodeNames = ccConfig.getConfigManager().getNodeNames();
List<Thread> startupThreads = new ArrayList<>();
for (NodeControllerService nc : nodeControllers) {
Thread ncStartThread = new Thread("IntegrationUtil-" + nc.getId()) {
@Override
public void run() {
try {
nc.start();
} catch (Exception e) {
LOGGER.log(Level.ERROR, e.getMessage(), e);
}
}
};
ncStartThread.start();
startupThreads.add(ncStartThread);
}
//wait until all NCs complete their startup
for (Thread thread : startupThreads) {
thread.join();
}
// Wait until cluster becomes active
((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE);
hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort(),
cc.getNetworkSecurityManager().getSocketChannelFactory());
this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
}
private void configureExternalLibDir() {
// hack to ensure we have a unique location for external libraries in our tests (asterix cluster has a shared
// home directory)-- TODO: rework this once the external lib dir can be configured explicitly
String appHome = joinPath(System.getProperty("app.home", System.getProperty("user.home")),
"appHome" + (int) (Math.random() * Integer.MAX_VALUE));
LOGGER.info("setting app.home to {}", appHome);
System.setProperty("app.home", appHome);
new File(appHome).deleteOnExit();
}
public void init(boolean deleteOldInstanceData, String externalLibPath, String confDir) throws Exception {
List<ILibraryManager> libraryManagers = new ArrayList<>();
init(deleteOldInstanceData, confDir);
if (externalLibPath != null && externalLibPath.length() != 0) {
for (NodeControllerService nc : ncs) {
INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext();
libraryManagers.add(runtimeCtx.getLibraryManager());
}
}
}
public ClusterControllerService getClusterControllerService() {
return cc;
}
protected CCConfig createCCConfig(ConfigManager configManager) throws IOException {
CCConfig ccConfig = new CCConfig(configManager);
ccConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ccConfig.setClientListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT);
ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
ccConfig.setResultTTL(RESULT_TTL);
ccConfig.setResultSweepThreshold(1000L);
ccConfig.setEnforceFrameWriterProtocol(true);
configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb"));
return ccConfig;
}
protected ICCApplication createCCApplication() {
return new CCApplication();
}
protected NCConfig createNCConfig(String ncName, ConfigManager configManager) {
NCConfig ncConfig = new NCConfig(ncName, configManager);
ncConfig.setClusterAddress("localhost");
ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
ncConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ncConfig.setDataListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ncConfig.setResultListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ncConfig.setMessagingListenAddress(Inet4Address.getLoopbackAddress().getHostAddress());
ncConfig.setResultTTL(RESULT_TTL);
ncConfig.setResultSweepThreshold(1000L);
ncConfig.setVirtualNC();
configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb", ncName));
return ncConfig;
}
protected INCApplication createNCApplication() {
// Instead of using this flag, RecoveryManagerTest should set the desired class in its config file
if (!gracefulShutdown) {
return new UngracefulShutdownNCApplication();
}
return new TestNCApplication();
}
private NCConfig fixupPaths(NCConfig ncConfig) throws IOException, AsterixException, CmdLineException {
// we have to first process the config
ncConfig.getConfigManager().processConfig();
// get initial partitions from config
String[] nodeStores = ncConfig.getNodeScopedAppConfig().getStringArray(NCConfig.Option.IODEVICES);
if (nodeStores == null) {
throw new IllegalStateException("Couldn't find stores for NC: " + ncConfig.getNodeId());
}
LOGGER.info("Using the path: " + getDefaultStoragePath());
for (int i = 0; i < nodeStores.length; i++) {
// create IO devices based on stores
nodeStores[i] = joinPath(getDefaultStoragePath(), ncConfig.getNodeId(), nodeStores[i]);
}
ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.IODEVICES, nodeStores);
final String keyStorePath = joinPath(RESOURCES_PATH, ncConfig.getKeyStorePath());
final String trustStorePath = joinPath(RESOURCES_PATH, ncConfig.getTrustStorePath());
ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.KEY_STORE_PATH, keyStorePath);
ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.TRUST_STORE_PATH, trustStorePath);
return ncConfig;
}
public IHyracksClientConnection getHyracksClientConnection() {
return hcc;
}
public void deinit(boolean deleteOldInstanceData) throws Exception {
//stop NCs
ArrayList<Thread> stopNCThreads = new ArrayList<>();
for (NodeControllerService nodeControllerService : ncs) {
if (nodeControllerService != null) {
Thread ncStopThread = new Thread() {
@Override
public void run() {
try {
nodeControllerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
};
stopNCThreads.add(ncStopThread);
ncStopThread.start();
}
}
//make sure all NCs stopped
for (Thread stopNcTheard : stopNCThreads) {
stopNcTheard.join();
}
stopCC(false);
if (deleteOldInstanceData && nodeNames != null) {
deleteTransactionLogs();
removeTestStorageFiles();
deleteCCFiles();
}
}
public void stopCC(boolean terminateNCService) throws Exception {
if (cc != null) {
cc.stop(terminateNCService);
cc = null;
}
}
public void setGracefulShutdown(boolean gracefulShutdown) {
this.gracefulShutdown = gracefulShutdown;
}
protected String getDefaultStoragePath() {
return storagePath;
}
public void removeTestStorageFiles() {
File dir = new File(getDefaultStoragePath());
for (String ncName : nodeNames) {
File ncDir = new File(dir, ncName);
FileUtils.deleteQuietly(ncDir);
}
}
private void deleteTransactionLogs() throws IOException, AsterixException {
for (String ncId : nodeNames) {
File log = new File(
PropertiesAccessor.getInstance(configManager.getAppConfig()).getTransactionLogDirs().get(ncId));
if (log.exists()) {
FileUtils.deleteDirectory(log);
}
}
}
private void deleteCCFiles() {
if (cc != null) {
FileUtils.deleteQuietly(new File(cc.getCCConfig().getRootDir()));
}
}
protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs, String confFile)
throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
deinit(cleanupOnShutdown);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Unexpected exception on shutdown", e);
}
}
});
init(cleanupOnStart, confFile);
while (true) {
Thread.sleep(10000);
}
}
protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
deinit(cleanupOnShutdown);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Unexpected exception on shutdown", e);
}
}
});
init(cleanupOnStart, loadExternalLibs);
while (true) {
Thread.sleep(10000);
}
}
public void addOption(IOption name, Object value) {
opts.add(Pair.of(name, value));
}
public void clearOptions() {
opts.clear();
}
/**
* @return the asterix-app absolute path if found, otherwise the default user path.
*/
static Path getProjectPath() {
final String targetDir = "asterix-app";
final BiPredicate<Path, BasicFileAttributes> matcher =
(path, attributes) -> path.getFileName().toString().equals(targetDir) && path.toFile().isDirectory()
&& path.toFile().list((d, n) -> n.equals("pom.xml")).length == 1;
final Path currentPath = Paths.get(System.getProperty("user.dir"));
try (Stream<Path> pathStream = Files.find(currentPath, 10, matcher)) {
return pathStream.findFirst().orElse(currentPath);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
static class LoggerHolder {
static final Logger LOGGER = LogManager.getLogger();
private LoggerHolder() {
}
}
private class TestNCApplication extends NCApplication {
@Override
protected void configurePersistedResourceRegistry() {
ncServiceCtx.setPersistedResourceRegistry(new TestPersistedResourceRegistry());
}
}
private class UngracefulShutdownNCApplication extends TestNCApplication {
@Override
public void stop() throws Exception {
// ungraceful shutdown
webManager.stop();
super.stop();
}
}
private static class TestPersistedResourceRegistry extends PersistedResourceRegistry {
@Override
protected void registerClasses() {
super.registerClasses();
registeredClasses.put("TestLsmBtreeLocalResource", TestLsmBtreeLocalResource.class);
registeredClasses.put("TestLsmIoOpCallbackFactory", TestLsmIoOpCallbackFactory.class);
registeredClasses.put("TestPrimaryIndexOperationTrackerFactory",
TestPrimaryIndexOperationTrackerFactory.class);
}
}
private static String getConfPath() {
String providedPath = System.getProperty("conf.path");
if (providedPath == null) {
return DEFAULT_CONF_FILE;
}
return joinPath(RESOURCES_PATH, providedPath);
}
}