blob: c148c92e26156d925b9b13b4deac8c1abd746eb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.hyracks.bootstrap;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
import static org.apache.asterix.common.utils.Servlets.QUERY_RESULT;
import static org.apache.asterix.common.utils.Servlets.QUERY_SERVICE;
import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS;
import static org.apache.asterix.common.utils.Servlets.UDF;
import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.BasicAuthServlet;
import org.apache.asterix.api.http.server.NCQueryServiceServlet;
import org.apache.asterix.api.http.server.NCUdfApiServlet;
import org.apache.asterix.api.http.server.NCUdfRecoveryServlet;
import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet;
import org.apache.asterix.api.http.server.QueryResultApiServlet;
import org.apache.asterix.api.http.server.QueryStatusApiServlet;
import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.api.http.server.StorageApiServlet;
import org.apache.asterix.app.config.ConfigValidator;
import org.apache.asterix.app.io.PersistedResourceRegistry;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.nc.NCExtensionManager;
import org.apache.asterix.app.nc.RecoveryManager;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionistFactory;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.config.NodeProperties;
import org.apache.asterix.common.config.PropertiesAccessor;
import org.apache.asterix.common.config.PropertiesFactory;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.utils.PrintUtil;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.asterix.utils.RedactionUtil;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.HttpServerConfig;
import org.apache.hyracks.http.server.HttpServerConfigBuilder;
import org.apache.hyracks.http.server.WebManager;
import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.hyracks.util.LoggingConfigUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NCApplication extends BaseNCApplication {
private static final Logger LOGGER = LogManager.getLogger();
protected NCExtensionManager ncExtensionManager;
protected INcApplicationContext runtimeContext;
private String nodeId;
private boolean stopInitiated;
protected boolean startupCompleted;
protected WebManager webManager;
private HttpServer apiServer;
@Override
public void registerConfig(IConfigManager configManager) {
super.registerConfig(configManager);
ApplicationConfigurator.registerConfigOptions(configManager);
}
@Override
public void init(IServiceContext serviceCtx) throws Exception {
super.init(serviceCtx);
configureLoggingLevel(ncServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL));
// set the node status initially to idle to indicate that it is pending booting
((NodeControllerService) serviceCtx.getControllerService()).setNodeStatus(NodeStatus.IDLE);
ncServiceCtx.setThreadFactory(
new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
validateEnvironment();
configurePersistedResourceRegistry();
}
@Override
public void start(String[] args) throws Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
}
nodeId = this.ncServiceCtx.getNodeId();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Starting Asterix node controller: " + nodeId);
}
final NodeControllerService controllerService = (NodeControllerService) ncServiceCtx.getControllerService();
if (System.getProperty("java.rmi.server.hostname") == null) {
System.setProperty("java.rmi.server.hostname",
(controllerService).getConfiguration().getClusterPublicAddress());
}
MetadataBuiltinFunctions.init();
ncExtensionManager = new NCExtensionManager(new ArrayList<>(getExtensions()));
runtimeContext = new NCAppRuntimeContext(ncServiceCtx, ncExtensionManager, getPropertiesFactory());
MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Substitute node joining : " + this.ncServiceCtx.getNodeId());
}
updateOnNodeJoin();
}
runtimeContext.initialize(getRecoveryManagerFactory(), getReceptionistFactory(), getConfigValidatorFactory(),
runtimeContext.getNodeProperties().isInitialRun());
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
MessagingChannelInterfaceFactory interfaceFactory =
new MessagingChannelInterfaceFactory(messageBroker, messagingProperties);
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest();
if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
throw new IllegalStateException(
String.format("Storage version mismatch.. Current version (%s). On disk version: (%s)",
StorageConstants.VERSION, latestCheckpoint.getStorageVersion()));
}
if (LOGGER.isInfoEnabled()) {
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
LOGGER.info("System state: " + recoveryMgr.getSystemState());
LOGGER.info("Node ID: " + nodeId);
LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
}
webManager = new WebManager();
performLocalCleanUp();
}
protected IRecoveryManagerFactory getRecoveryManagerFactory() {
return RecoveryManager::new;
}
protected IReceptionistFactory getReceptionistFactory() {
return Receptionist::new;
}
protected IConfigValidatorFactory getConfigValidatorFactory() {
return ConfigValidator::new;
}
@Override
public void configureLoggingLevel(Level level) {
super.configureLoggingLevel(level);
LoggingConfigUtil.defaultIfMissing(GlobalConfig.ASTERIX_LOGGER_NAME, level);
LogRedactionUtil.setRedactor(RedactionUtil.LOG_REDACTOR);
}
protected void configureServers() throws Exception {
final ExternalProperties externalProperties = getApplicationContext().getExternalProperties();
final HttpServerConfig config =
HttpServerConfigBuilder.custom().setMaxRequestSize(externalProperties.getMaxWebRequestSize()).build();
apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getNcApiPort(),
config);
apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ncServiceCtx);
apiServer.setAttribute(HYRACKS_CONNECTION_ATTR, getApplicationContext().getHcc());
apiServer.addServlet(new StorageApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.STORAGE));
apiServer.addServlet(
new NetDiagnosticsApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.NET_DIAGNOSTICS));
final ILangCompilationProvider sqlppCompilationProvider =
ncExtensionManager.getCompilationProvider(ILangExtension.Language.SQLPP);
apiServer.addServlet(new NCQueryServiceServlet(apiServer.ctx(), new String[] { QUERY_SERVICE },
getApplicationContext(), sqlppCompilationProvider.getLanguage(), sqlppCompilationProvider, null));
apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
parseCredentialMap(((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration()
.getCredentialFilePath()));
Pair<Map<String, String>, Map<String, String>> auth = BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
apiServer
.addServlet(new BasicAuthServlet(apiServer.ctx(),
new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(),
apiServer.getScheme(), apiServer.getAddress().getPort()),
auth.getFirst(), auth.getSecond()));
apiServer.addServlet(new BasicAuthServlet(
apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { UDF_RECOVERY },
getApplicationContext(), apiServer.getScheme(), apiServer.getAddress().getPort()),
auth.getFirst(), auth.getSecond()));
apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_STATUS));
apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_RESULT));
webManager.add(apiServer);
}
protected List<AsterixExtension> getExtensions() throws Exception {
return new ExtensionProperties(PropertiesAccessor.getInstance(ncServiceCtx.getAppConfig())).getExtensions();
}
protected IPropertiesFactory getPropertiesFactory() throws IOException, AsterixException {
PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ncServiceCtx.getAppConfig());
return new PropertiesFactory(propertiesAccessor);
}
@Override
public void stop() throws Exception {
if (!stopInitiated) {
runtimeContext.setShuttingdown(true);
stopInitiated = true;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stopping Asterix node controller: " + nodeId);
}
webManager.stop();
//Clean any temporary files
performLocalCleanUp();
//Note: stopping recovery manager will make a sharp checkpoint
ncServiceCtx.getLifeCycleComponentManager().stopAll(false);
runtimeContext.deinitialize();
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Duplicate attempt to stop ignored: " + nodeId);
}
}
super.stop();
}
@Override
public void preStop() throws Exception {
runtimeContext.preStop();
}
@Override
public synchronized void startupCompleted() throws Exception {
super.startupCompleted();
// configure servlets after joining the cluster, so we can create HyracksClientConnection
configureServers();
webManager.start();
startupCompleted = true;
notifyAll();
}
@Override
public synchronized void tasksCompleted(CcId ccId) throws Exception {
while (!startupCompleted) {
this.wait();
}
final NodeControllerService ncs = (NodeControllerService) ncServiceCtx.getControllerService();
final NodeStatus currentStatus = ncs.getNodeStatus();
final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId)
? getCurrentSystemState() : SystemState.HEALTHY;
final Map httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
currentStatus, systemState, httpSecrets);
}
@Override
public NodeCapacity getCapacity() {
StorageProperties storageProperties = runtimeContext.getStorageProperties();
final long memorySize = storageProperties.getJobExecutionMemoryBudget();
int allCores = Runtime.getRuntime().availableProcessors();
return new NodeCapacity(memorySize, allCores);
}
private void performLocalCleanUp() throws HyracksDataException {
//Delete working area files from failed jobs
runtimeContext.getIoManager().deleteWorkspaceFiles();
// Reclaim storage for orphaned index artifacts in NCs.
final Set<Integer> nodePartitions = runtimeContext.getReplicaManager().getPartitions();
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
localResourceRepository.deleteCorruptedResources();
for (Integer partition : nodePartitions) {
localResourceRepository.cleanup(partition);
}
}
private void updateOnNodeJoin() {
MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(nodeId)) {
NCConfig ncConfig = ((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration();
ncConfig.getConfigManager().ensureNode(nodeId);
}
}
@Override
public INcApplicationContext getApplicationContext() {
return runtimeContext;
}
@Override
public IFileDeviceResolver getFileDeviceResolver() {
return (relPath, devices) -> {
int ioDeviceIndex = Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % devices.size());
return devices.get(ioDeviceIndex);
};
}
private boolean isPendingStartupTasks(NodeStatus nodeStatus, CcId primaryCc, CcId registeredCc) {
return nodeStatus == NodeStatus.IDLE && (primaryCc == null || primaryCc.equals(registeredCc));
}
private SystemState getCurrentSystemState() {
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
SystemState state = recoveryMgr.getSystemState();
// Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
if (state == SystemState.PERMANENT_DATA_LOSS
&& (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
state = SystemState.BOOTSTRAPPING;
}
return state;
}
protected void validateEnvironment() throws HyracksDataException {
validateJavaRuntime();
}
protected void validateJavaRuntime() throws HyracksDataException {
ApplicationConfigurator.validateJavaRuntime();
}
protected void configurePersistedResourceRegistry() {
ncServiceCtx.setPersistedResourceRegistry(new PersistedResourceRegistry());
}
private Map<String, String> parseCredentialMap(String credPath) {
File credentialFile = new File(credPath);
Map<String, String> storedCredentials = new HashMap<>();
if (credentialFile.exists()) {
try (CSVParser p =
CSVParser.parse(credentialFile, Charset.defaultCharset(), CSVFormat.DEFAULT.withDelimiter(':'))) {
List<CSVRecord> recs = p.getRecords();
for (CSVRecord r : recs) {
if (r.size() != 2) {
throw new IOException("Passwd file must have exactly two fields.");
}
storedCredentials.put(r.get(0), r.get(1));
}
} catch (IOException e) {
LOGGER.error("Malformed credential file", e);
}
}
return storedCredentials;
}
}