blob: 8ee1626f16a5f6043d3d52fb7755f06b1b77536d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import java.io.File;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
// These are from the minifi-nar-utils
public class MiNiFi {
private static final Logger logger = LoggerFactory.getLogger(MiNiFi.class);
private final MiNiFiServer minifiServer;
private final BootstrapListener bootstrapListener;
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private volatile boolean shutdown = false;
public MiNiFi(final NiFiProperties properties)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
logger.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
logger.error("", e);
}
});
// register the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
// shutdown the jetty server
shutdownHook(true);
}
}));
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if (bootstrapPort != null) {
try {
final int port = Integer.parseInt(bootstrapPort);
if (port < 1 || port > 65535) {
throw new RuntimeException("Failed to start MiNiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
} catch (final NumberFormatException nfe) {
throw new RuntimeException("Failed to start MiNiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {
logger.info("MiNiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
bootstrapListener = null;
}
// delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing
// the working directory, we can be assured that it will attempt to extract the
// war every time the application starts.
File webWorkingDir = properties.getWebWorkingDirectory();
FileUtils.deleteFilesInDirectory(webWorkingDir, null, logger, true, true);
FileUtils.deleteFile(webWorkingDir, logger, 3);
detectTimingIssues();
// redirect JUL log events
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
// expand the nars
NarUnpacker.unpackNars(properties);
// load the extensions classloaders
NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader();
if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
}
final Bundle systemBundle = SystemBundle.create(properties);
final Set<Bundle> narBundles = NarClassLoaders.getInstance().getBundles();
// discover the extensions
ExtensionManager.discoverExtensions(systemBundle, narBundles);
ExtensionManager.logClassLoaderMapping();
// load the server from the framework classloader
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
Class<?> minifiServerClass= Class.forName("org.apache.nifi.minifi.MiNiFiServer", true, frameworkClassLoader);
Constructor<?> minifiServerConstructor = minifiServerClass.getConstructor(NiFiProperties.class);
final long startTime = System.nanoTime();
minifiServer = (MiNiFiServer) minifiServerConstructor.newInstance(properties);
if (shutdown) {
logger.info("MiNiFi has been shutdown via MiNiFi Bootstrap. Will not start Controller");
} else {
minifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.sendStartedStatus(true);
}
final long endTime = System.nanoTime();
logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
}
}
protected void shutdownHook(boolean isReload) {
try {
this.shutdown = true;
logger.info("Initiating shutdown of MiNiFi server...");
if (minifiServer != null) {
minifiServer.stop();
}
if (bootstrapListener != null) {
if (isReload) {
bootstrapListener.reload();
} else {
bootstrapListener.stop();
}
}
logger.info("MiNiFi server shutdown completed (nicely or otherwise).");
} catch (final Throwable t) {
logger.warn("Problem occurred ensuring MiNiFi server was properly terminated due to " + t);
}
}
/**
* Determine if the machine we're running on has timing issues.
*/
private void detectTimingIssues() {
final int minRequiredOccurrences = 25;
final int maxOccurrencesOutOfRange = 15;
final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
t.setDaemon(true);
t.setName("Detect Timing Issues");
return t;
}
});
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
final AtomicInteger occurrences = new AtomicInteger(0);
final Runnable command = new Runnable() {
@Override
public void run() {
final long curMillis = System.currentTimeMillis();
final long difference = curMillis - lastTriggerMillis.get();
final long millisOff = Math.abs(difference - 2000L);
occurrences.incrementAndGet();
if (millisOff > 500L) {
occurrencesOutOfRange.incrementAndGet();
}
lastTriggerMillis.set(curMillis);
}
};
final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
future.cancel(true);
service.shutdownNow();
if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
logger.warn("MiNiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ "Processors to be scheduled erratically. Please see the MiNiFi documentation for more information.");
}
}
};
final Timer timer = new Timer(true);
timer.schedule(timerTask, 60000L);
}
MiNiFiServer getMinifiServer() {
return minifiServer;
}
/**
* Main entry point of the application.
*
* @param args things which are ignored
*/
public static void main(String[] args) {
logger.info("Launching MiNiFi...");
try {
NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
new MiNiFi(niFiProperties);
} catch (final Throwable t) {
logger.error("Failure to launch MiNiFi due to " + t, t);
}
}
}