blob: 01e1c71da91995b473dd9644ab14304647f6c795 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.DiagnosticUtils;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
public class NiFi implements NiFiEntryPoint {
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class);
private static final String KEY_FILE_FLAG = "-K";
private final NiFiServer nifiServer;
private final BootstrapListener bootstrapListener;
private final NiFiProperties properties;
private volatile boolean shutdown = false;
public NiFi(final NiFiProperties properties)
throws ClassNotFoundException, IOException, IllegalArgumentException {
this(properties, ClassLoader.getSystemClassLoader());
}
public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
throws ClassNotFoundException, IOException, IllegalArgumentException {
this.properties = properties;
// There can only be one krb5.conf for the overall Java process so set this globally during
// start up so that processors and our Kerberos authentication code don't have to set this
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
if (kerberosConfigFile != null) {
final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
LOGGER.info("Setting java.security.krb5.conf to {}", kerberosConfigFilePath);
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
}
setDefaultUncaughtExceptionHandler();
// register the shutdown hook
addShutdownHook();
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if (bootstrapPort != null) {
try {
final int port = Integer.parseInt(bootstrapPort);
if (port < 1 || port > 65535) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
} catch (final NumberFormatException nfe) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {
LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
bootstrapListener = null;
}
// delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing
// the working directory, we can be assured that it will attempt to extract the
// war every time the application starts.
File webWorkingDir = properties.getWebWorkingDirectory();
FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
detectTimingIssues();
// redirect JUL log events
initLogging();
final Bundle systemBundle = SystemBundle.create(properties, rootClassLoader);
// expand the nars
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
// load the extensions classloaders
NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
}
final Set<Bundle> narBundles = narClassLoaders.getBundles();
final long startTime = System.nanoTime();
nifiServer = narClassLoaders.getServer();
if (nifiServer == null) {
throw new IllegalStateException("Unable to find a NiFiServer implementation.");
}
Thread.currentThread().setContextClassLoader(nifiServer.getClass().getClassLoader());
// Filter out the framework NAR from being loaded by the NiFiServer
nifiServer.initialize(properties,
systemBundle,
narBundles,
extensionMapping);
if (shutdown) {
LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
nifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.setNiFiLoaded(true);
bootstrapListener.sendStartedStatus(true);
}
final long duration = System.nanoTime() - startTime;
LOGGER.info("Controller initialization took {} nanoseconds ( {} seconds).",
duration, (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS));
}
}
public NiFiServer getServer() {
return nifiServer;
}
protected void setDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
LOGGER.error("An Unknown Error Occurred in Thread {}: {}", thread, exception.toString());
LOGGER.error("", exception);
});
}
protected void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() ->
// shutdown the jetty server
shutdownHook(false)
));
}
protected void initLogging() {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}
private static ClassLoader createBootstrapClassLoader() {
//Get list of files in bootstrap folder
final List<URL> urls = new ArrayList<>();
try (final Stream<Path> files = Files.list(Paths.get("lib/bootstrap"))) {
files.forEach(p -> {
try {
urls.add(p.toUri().toURL());
} catch (final MalformedURLException mef) {
LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef);
}
});
} catch (IOException ioe) {
LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe);
}
//Create the bootstrap classloader
return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
}
public void shutdownHook(final boolean isReload) {
try {
runDiagnosticsOnShutdown();
shutdown();
} catch (final Throwable t) {
LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to ", t);
}
}
private void runDiagnosticsOnShutdown() throws IOException {
if (properties.isDiagnosticsOnShutdownEnabled()) {
final String diagnosticDirectoryPath = properties.getDiagnosticsOnShutdownDirectory();
final boolean isCreated = DiagnosticUtils.createDiagnosticDirectory(diagnosticDirectoryPath);
if (isCreated) {
LOGGER.debug("Diagnostic directory has successfully been created.");
}
while (DiagnosticUtils.isFileCountExceeded(diagnosticDirectoryPath, properties.getDiagnosticsOnShutdownMaxFileCount())
|| DiagnosticUtils.isSizeExceeded(diagnosticDirectoryPath, DataUnit.parseDataSize(properties.getDiagnosticsOnShutdownDirectoryMaxSize(), DataUnit.B).longValue())) {
final Path oldestFile = DiagnosticUtils.getOldestFile(diagnosticDirectoryPath);
Files.delete(oldestFile);
}
final String fileName = String.format("%s/diagnostic-%s.log", diagnosticDirectoryPath, DATE_TIME_FORMATTER.format(LocalDateTime.now()));
diagnose(new File(fileName), properties.isDiagnosticsOnShutdownVerbose());
}
}
private void diagnose(final File file, final boolean verbose) throws IOException {
final DiagnosticsDump diagnosticsDump = getServer().getDiagnosticsFactory().create(verbose);
try (final OutputStream fileOutputStream = new FileOutputStream(file)) {
diagnosticsDump.writeTo(fileOutputStream);
}
}
protected void shutdown() {
this.shutdown = true;
LOGGER.info("Initiating shutdown of Jetty web server...");
if (nifiServer != null) {
nifiServer.stop();
}
if (bootstrapListener != null) {
bootstrapListener.stop();
}
LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
}
/**
* Determine if the machine we're running on has timing issues.
*/
private void detectTimingIssues() {
final int minRequiredOccurrences = 25;
final int maxOccurrencesOutOfRange = 15;
final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable runnable) {
final Thread t = defaultFactory.newThread(runnable);
t.setDaemon(true);
t.setName("Detect Timing Issues");
return t;
}
});
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
final AtomicInteger occurrences = new AtomicInteger(0);
final Runnable command = () -> {
final long curMillis = System.currentTimeMillis();
final long difference = curMillis - lastTriggerMillis.get();
final long millisOff = Math.abs(difference - 2000L);
occurrences.incrementAndGet();
if (millisOff > 500L) {
occurrencesOutOfRange.incrementAndGet();
}
lastTriggerMillis.set(curMillis);
};
final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
future.cancel(true);
service.shutdownNow();
if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
}
}
};
final Timer timer = new Timer(true);
timer.schedule(timerTask, 60000L);
}
/**
* Main entry point of the application.
*
* @param args things which are ignored
*/
public static void main(String[] args) {
LOGGER.info("Launching NiFi...");
try {
NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
new NiFi(properties);
} catch (final Throwable t) {
LOGGER.error("Failure to launch NiFi due to " + t, t);
}
}
protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) {
return convertArgumentsToValidatedNiFiProperties(args, createBootstrapClassLoader());
}
protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args, final ClassLoader bootstrapClassLoader) {
NiFiProperties properties = initializeProperties(args, bootstrapClassLoader);
properties.validate();
return properties;
}
private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
// Try to get key
// If key doesn't exist, instantiate without
// Load properties
// If properties are protected and key missing, throw RuntimeException
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
final String key;
try {
key = loadFormattedKey(args);
// The key might be empty or null when it is passed to the loader
} catch (IllegalArgumentException e) {
final String msg = "The bootstrap process did not provide a valid key";
throw new IllegalArgumentException(msg, e);
}
Thread.currentThread().setContextClassLoader(boostrapLoader);
try {
final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader);
final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class);
final Object loaderInstance = withKeyMethod.invoke(null, key);
final Method getMethod = propsLoaderClass.getMethod("get");
final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
LOGGER.info("Loaded {} properties", properties.size());
return properties;
} catch (InvocationTargetException wrappedException) {
final String msg = "There was an issue decrypting protected properties";
throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause());
} catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) {
final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue";
throw new IllegalArgumentException(msg, reex);
} catch (final RuntimeException e) {
final String msg = "There was an issue decrypting protected properties";
throw new IllegalArgumentException(msg, e);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
private static String loadFormattedKey(String[] args) {
String key = null;
List<String> parsedArgs = parseArgs(args);
// Check if args contain protection key
if (parsedArgs.contains(KEY_FILE_FLAG)) {
key = getKeyFromKeyFileAndPrune(parsedArgs);
// Format the key (check hex validity and remove spaces)
key = formatHexKey(key);
}
if (null == key) {
return "";
} else if (!isHexKeyValid(key)) {
throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
} else {
return key;
}
}
private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) {
String key = null;
LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag");
int i = parsedArgs.indexOf(KEY_FILE_FLAG);
if (parsedArgs.size() <= i + 1) {
LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG);
throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key");
}
try {
String passwordfilePath = parsedArgs.get(i + 1);
// Slurp in the contents of the file:
byte[] encoded = Files.readAllBytes(Paths.get(passwordfilePath));
key = new String(encoded, StandardCharsets.UTF_8);
if (0 == key.length())
throw new IllegalArgumentException("Key in keyfile " + passwordfilePath + " yielded an empty key");
LOGGER.info("Now overwriting file in {}", passwordfilePath);
// Overwrite the contents of the file (to avoid littering file system
// unlinked with key material):
File passwordFile = new File(passwordfilePath);
FileWriter overwriter = new FileWriter(passwordFile, false);
// Construe a random pad:
Random random = new Random();
StringBuffer sb = new StringBuffer();
// Note on correctness: this pad is longer, but equally sufficient.
while (sb.length() < encoded.length) {
sb.append(Integer.toHexString(random.nextInt()));
}
String pad = sb.toString();
LOGGER.info("Overwriting key material with pad: {}", pad);
overwriter.write(pad);
overwriter.close();
LOGGER.info("Removing/unlinking file: {}", passwordfilePath);
passwordFile.delete();
} catch (IOException e) {
LOGGER.error("Caught IOException while retrieving the {} -passed keyfile; aborting: {}", KEY_FILE_FLAG, e.toString());
System.exit(1);
}
LOGGER.info("Read property protection key from key file provided by bootstrap process");
return key;
}
private static List<String> parseArgs(String[] args) {
List<String> parsedArgs = new ArrayList<>(Arrays.asList(args));
for (int i = 0; i < parsedArgs.size(); i++) {
if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) {
String[] split = parsedArgs.get(i).split(" ", 2);
parsedArgs.set(i, split[0]);
parsedArgs.add(i + 1, split[1]);
break;
}
}
return parsedArgs;
}
private static String formatHexKey(String input) {
if (input == null || input.trim().isEmpty()) {
return "";
}
return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase();
}
private static boolean isHexKeyValid(String key) {
if (key == null || key.trim().isEmpty()) {
return false;
}
// Key length is in "nibbles" (i.e. one hex char = 4 bits)
return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$");
}
}