blob: 46519b8d8ac85f606fcbda46b7e7c0d7e4b032c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history.questdb;
import io.questdb.MessageBusImpl;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.DefaultCairoConfiguration;
import io.questdb.griffin.SqlCompiler;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.SqlExecutionContextImpl;
import org.apache.nifi.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The database manager is responsible for checking and maintaining the health of the database during startup.
*/
public final class QuestDbDatabaseManager {
private enum DatabaseStatus {
HEALTHY, NON_EXISTING, CORRUPTED;
}
private static final Logger LOGGER = LoggerFactory.getLogger(QuestDbDatabaseManager.class);
private static final Set<String> COMPONENT_TABLES = new HashSet<>();
private static final Set<String> NODE_TABLES = new HashSet<>();
static {
COMPONENT_TABLES.add("componentCounter");
COMPONENT_TABLES.add("connectionStatus");
COMPONENT_TABLES.add("processGroupStatus");
COMPONENT_TABLES.add("remoteProcessGroupStatus");
COMPONENT_TABLES.add("processorStatus");
NODE_TABLES.add("nodeStatus");
NODE_TABLES.add("garbageCollectionStatus");
NODE_TABLES.add("storageStatus");
}
private QuestDbDatabaseManager() {
// Should not be instantiated.
}
public static void checkDatabaseStatus(final Path persistLocation) {
final QuestDbDatabaseManager.DatabaseStatus databaseStatus = getDatabaseStatus(persistLocation);
LOGGER.debug("Starting status repository. It's estimated status is {}", databaseStatus);
if (databaseStatus == QuestDbDatabaseManager.DatabaseStatus.NON_EXISTING) {
createDatabase(persistLocation);
} else if (databaseStatus == QuestDbDatabaseManager.DatabaseStatus.CORRUPTED) {
throw new RuntimeException("The database is corrupted. The expected set of tables is not matching with the reachable tables.");
}
}
private static DatabaseStatus getDatabaseStatus(final Path persistLocation) {
if (!checkPersistentLocationExists(persistLocation)) {
return DatabaseStatus.NON_EXISTING;
}
if (checkPersistentLocationExists(persistLocation) && checkPersistentLocationIsEmpty(persistLocation)) {
return DatabaseStatus.NON_EXISTING;
}
if (!checkTablesAreInPlace(persistLocation) || !checkConnection(persistLocation)) {
return DatabaseStatus.CORRUPTED;
}
return DatabaseStatus.HEALTHY;
}
private static boolean checkPersistentLocationExists(final Path persistLocation) {
final File persistLocationDirectory = persistLocation.toFile();
return persistLocationDirectory.exists() && persistLocationDirectory.isDirectory();
}
private static boolean checkPersistentLocationIsEmpty(final Path persistLocation) {
final File persistLocationDirectory = persistLocation.toFile();
return persistLocationDirectory.list().length == 0;
}
private static boolean checkTablesAreInPlace(final Path persistLocation) {
final File persistLocationDirectory = persistLocation.toFile();
final Map<String, File> databaseFiles = Arrays.stream(persistLocationDirectory.listFiles())
.collect(Collectors.toMap(f -> f.getAbsolutePath().substring(persistLocationDirectory.getAbsolutePath().length() + 1), f -> f));
final Set<String> expectedTables = new HashSet<>();
expectedTables.addAll(NODE_TABLES);
expectedTables.addAll(COMPONENT_TABLES);
for (final String expectedTable : expectedTables) {
if (!databaseFiles.containsKey(expectedTable) || !databaseFiles.get(expectedTable).isDirectory()) {
LOGGER.error("Missing table during database status check: ", expectedTable);
return false;
}
}
return true;
}
private static boolean checkConnection(final Path persistLocation) {
final CairoConfiguration configuration = new DefaultCairoConfiguration(persistLocation.toFile().getAbsolutePath());
try (
final CairoEngine engine = new CairoEngine(configuration);
) {
LOGGER.info("Connection to database was successful");
return true;
} catch (Exception e) {
LOGGER.error("Error during connection to database", e);
return false;
}
}
private static void createDatabase(final Path persistLocation) {
LOGGER.info("Creating database");
final CairoConfiguration configuration;
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(persistLocation.toFile());
} catch (final Exception e) {
throw new RuntimeException("Could not create database folder " + persistLocation.toAbsolutePath().toString(), e);
}
configuration = new DefaultCairoConfiguration(persistLocation.toFile().getAbsolutePath());
try (
final CairoEngine engine = new CairoEngine(configuration);
final SqlCompiler compiler = new SqlCompiler(engine);
) {
final SqlExecutionContext context = new SqlExecutionContextImpl(engine.getConfiguration(), new MessageBusImpl(), 1);
// Node status tables
compiler.compile(QuestDbQueries.CREATE_GARBAGE_COLLECTION_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_NODE_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_STORAGE_STATUS, context);
// Component status tables
compiler.compile(QuestDbQueries.CREATE_CONNECTION_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_PROCESS_GROUP_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_REMOTE_PROCESS_GROUP_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_PROCESSOR_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_COMPONENT_COUNTER, context);
LOGGER.info("Database is created");
} catch (final Exception e) {
throw new RuntimeException("Could not create database!", e);
}
}
public static Set<String> getNodeTableNames() {
return NODE_TABLES;
}
public static Set<String> getComponentTableNames() {
return COMPONENT_TABLES;
}
}