blob: 2f23599c82efda6fca2f5bb37371342ac030b8f6 [file] [log] [blame]
/*
* Copyright 2014 IBM Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.spark.boot.layer
import com.ibm.spark.boot.KernelBootstrap
import com.ibm.spark.interpreter.Interpreter
import com.ibm.spark.utils.LogLike
/**
* Represents the hook (interrupt/shutdown) initialization. All JVM-related
* hooks should be constructed here.
*/
trait HookInitialization {
/**
* Initializes and registers all hooks.
*
* @param interpreter The main interpreter
*/
def initializeHooks(interpreter: Interpreter): Unit
}
/**
* Represents the standard implementation of HookInitialization.
*/
trait StandardHookInitialization extends HookInitialization {
this: KernelBootstrap with LogLike =>
/**
* Initializes and registers all hooks.
*
* @param interpreter The main interpreter
*/
def initializeHooks(interpreter: Interpreter): Unit = {
registerInterruptHook(interpreter)
registerShutdownHook()
}
private def registerInterruptHook(interpreter: Interpreter): Unit = {
val self = this
import sun.misc.{Signal, SignalHandler}
// TODO: Signals are not a good way to handle this since JVM only has the
// proprietary sun API that is not necessarily available on all platforms
Signal.handle(new Signal("INT"), new SignalHandler() {
private val MaxSignalTime: Long = 3000 // 3 seconds
var lastSignalReceived: Long = 0
def handle(sig: Signal) = {
val currentTime = System.currentTimeMillis()
if (currentTime - lastSignalReceived > MaxSignalTime) {
logger.info("Resetting code execution!")
interpreter.interrupt()
// TODO: Cancel group representing current code execution
//sparkContext.cancelJobGroup()
logger.info("Enter Ctrl-C twice to shutdown!")
lastSignalReceived = currentTime
} else {
logger.info("Shutting down kernel")
self.shutdown()
}
}
})
}
private def registerShutdownHook(): Unit = {
logger.debug("Registering shutdown hook")
val self = this
val mainThread = Thread.currentThread()
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() = {
logger.info("Shutting down kernel")
self.shutdown()
// TODO: Check if you can magically access the spark context to stop it
// TODO: inside a different thread
if (mainThread.isAlive) mainThread.join()
}
})
}
}