| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.iota.fey |
| |
| import java.io.{BufferedWriter, File, FileWriter} |
| import java.net.{URL, URLClassLoader} |
| import java.nio.file.{Files, Paths} |
| |
| import ch.qos.logback.classic.{Level, Logger, LoggerContext} |
| import ch.qos.logback.core.joran.spi.JoranException |
| import ch.qos.logback.core.util.StatusPrinter |
| import com.eclipsesource.schema.SchemaType |
| import com.typesafe.config.{Config, ConfigFactory, ConfigValue} |
| import org.slf4j.LoggerFactory |
| import play.api.libs.json.{JsObject, JsValue, Json} |
| |
| import scala.collection.mutable.HashMap |
| import scala.io.Source |
| |
| protected object Utils { |
| |
| import CONFIG._ |
| |
| private val log = LoggerFactory.getLogger(this.getClass) |
| |
| /** |
| * Keeps the loaded clazz in memory |
| * JARNAME,[CLASSPATH, CLASS] |
| */ |
| val loadedJars: HashMap[String, (URLClassLoader, Map[String, Class[FeyGenericActor]])] |
| = HashMap.empty[String, (URLClassLoader, Map[String, Class[FeyGenericActor]])] |
| |
| /** |
| * Gets a list of Files in the directory |
| * |
| * @param stringPath dir path |
| * @return Array of files in the directory |
| */ |
| def getFilesInDirectory(stringPath: String): Array[File]= { |
| val dir = new File(stringPath) |
| if (dir.exists && dir.isDirectory) { |
| dir.listFiles() |
| }else{ |
| Array.empty |
| } |
| } |
| |
| /** |
| * Loads an actor class from a .jar that inherited from FeyGenericActor |
| * |
| * @param path path to the .jar (including the name) |
| * @param className class path inside the jar |
| * @return class of FeyGenericActor |
| */ |
| def loadActorClassFromJar(path: String, className: String, jarName: String):Class[FeyGenericActor] = { |
| |
| loadedJars.get(jarName) match { |
| |
| case None => |
| val urls:Array[URL] = Array(new URL(s"jar:file:$path!/")) |
| val cl: URLClassLoader = URLClassLoader.newInstance(urls) |
| val clazz = cl.loadClass(className) |
| val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]] |
| log.info(s"$path -> $className") |
| loadedJars.put(jarName, (cl, Map(className -> feyClazz))) |
| feyClazz |
| |
| case Some(loadedJar) => |
| loadedJar._2.get(className) match { |
| case None => |
| val clazz = loadedJar._1.loadClass(className) |
| val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]] |
| loadedJars.put(jarName, (loadedJar._1, Map(className -> feyClazz) ++ loadedJar._2)) |
| feyClazz |
| case Some(clazz) => |
| clazz |
| } |
| } |
| |
| } |
| |
| /** |
| * Loads a JSON object from a file |
| * |
| * @param file |
| * @return JsValue of the file |
| */ |
| def loadJsonFromFile(file: File): Option[JsValue] = { |
| try{ |
| val stringJson = Source.fromFile(file).getLines.mkString |
| Option(Json.parse(stringJson)) |
| }catch{ |
| case e: Exception => |
| log.error("Could not parse JSON", e) |
| None |
| } |
| } |
| |
| def renameProcessedFile(file: File, extension: String): Unit = { |
| if(CHEKPOINT_ENABLED) { |
| file.renameTo(new File(s"${file.getAbsoluteFile}.$extension")) |
| } |
| } |
| |
| /** |
| * Saves the Orchestration JSON to a tmp directory so Fey can recovery in case it stops or fails |
| * |
| * @param orchestrationID |
| * @param delete |
| * @return |
| */ |
| def updateOrchestrationState(orchestrationID: String, delete: Boolean = false) : Unit = { |
| if (CHEKPOINT_ENABLED) { |
| FEY_CACHE.activeOrchestrations.get(orchestrationID) match { |
| case None => |
| if (!delete) { |
| log.warn(s"Could not save state for Orchestration ${orchestrationID}. It is not active on Fey.") |
| } |
| else { |
| val file = new File(s"$CHECKPOINT_DIR/${orchestrationID}.json") |
| if (!file.createNewFile()) { |
| file.delete() |
| } |
| ORCHESTRATION_CACHE.orchestration_metadata.remove(orchestrationID) |
| ORCHESTRATION_CACHE.orchestration_name.remove(orchestrationID) |
| } |
| case Some(orch) => |
| ORCHESTRATION_CACHE.orchestration_metadata.get(orchestrationID) match { |
| case None => log.warn(s"Could not save state for Orchestration ${orchestrationID}. No metadata defined.") |
| case Some(metadata) => |
| val ensembleJSON = metadata.map(ensenble => ensenble._2) |
| val name: String = ORCHESTRATION_CACHE.orchestration_name.getOrElse(orchestrationID, "NOT SAVED") |
| val orchestrationSpec = Json.obj(JSON_PATH.GUID -> orchestrationID, |
| JSON_PATH.COMMAND -> "RECREATE", |
| JSON_PATH.ORCHESTRATION_NAME -> name, |
| JSON_PATH.ORCHESTRATION_TIMESTAMP -> System.currentTimeMillis.toString, |
| JSON_PATH.ENSEMBLES -> ensembleJSON |
| ) |
| |
| val file = new File(s"$CHECKPOINT_DIR/${orchestrationID}.json") |
| file.getParentFile().mkdirs() |
| file.createNewFile() |
| val bw = new BufferedWriter(new FileWriter(file)) |
| bw.write(Json.stringify(orchestrationSpec)) |
| bw.close() |
| log.info(s"Orchestration ${orchestrationID} saved.") |
| } |
| } |
| }else{ |
| log.debug("Checkpoint not enabled") |
| } |
| } |
| |
| /** |
| * timestamp in milliseconds |
| * |
| * @return Long |
| */ |
| def getTimestamp:Long = System.currentTimeMillis() |
| |
| } |
| |
| object JSON_PATH{ |
| val PERFORMERS: String = "performers" |
| val CONNECTIONS: String = "connections" |
| val GUID: String = "guid" |
| val COMMAND: String = "command" |
| val ENSEMBLES: String = "ensembles" |
| val SCHEDULE: String = "schedule" |
| val BACKOFF: String = "backoff" |
| val SOURCE: String = "source" |
| val SOURCE_NAME: String = "name" |
| val SOURCE_CLASSPATH: String = "classPath" |
| val SOURCE_PARAMS: String = "parameters" |
| val ORCHESTRATION_NAME = "name" |
| val ORCHESTRATION_TIMESTAMP = "timestamp" |
| val PERFORMER_AUTO_SCALE = "autoScale" |
| val CONTROL_AWARE = "controlAware" |
| val JAR_LOCATION = "location" |
| val JAR_LOCATION_URL = "url" |
| val JAR_CREDENTIALS_URL = "credentials" |
| val JAR_CRED_USER = "user" |
| val JAR_CRED_PASSWORD = "password" |
| val PERFORMER_DISPATCHER = "dispatcher" |
| } |
| |
| object CONFIG{ |
| |
| private val log = LoggerFactory.getLogger(this.getClass) |
| |
| val FILE_APPENDER = "FEY-FILE" |
| val CONSOLE_APPENDER = "FEY-CONSOLE" |
| val CONTROL_AWARE_MAILBOX = "akka.fey-dispatchers.control-aware-dispatcher" |
| |
| var CHECKPOINT_DIR = "" |
| var JSON_REPOSITORY = "" |
| var JSON_EXTENSION = "" |
| var JAR_REPOSITORY = "" |
| var CHEKPOINT_ENABLED = true |
| var LOG_LEVEL = "" |
| var LOG_APPENDER = "" |
| var MESSAGES_PER_RESIZE:Int = 500 |
| var DYNAMIC_JAR_REPO = "" |
| var DYNAMIC_JAR_FORCE_PULL = false |
| var CUSTOM_DISPATCHERS: ConfigValue = null |
| var MONITORING_ENABLED: Boolean = true |
| var MONITORING_TYPE: String = "COMPLETE" |
| var PORT = 8080 |
| var URL_PATH = "localhost" |
| |
| def loadUserConfiguration(path: String) : Unit = { |
| val app = { |
| if(path != "" && Files.exists(Paths.get(path))) { |
| ConfigFactory.parseFile(new File(path)).withFallback(ConfigFactory.load()) |
| }else { |
| log.info("Using Fey Default Configuration") |
| log.warn(s"No user configuration defined. Check if your configuration path $path is right.") |
| ConfigFactory.load() |
| } |
| }.getConfig("fey-global-configuration").resolve() |
| |
| CHECKPOINT_DIR = app.getString("checkpoint-directory") |
| JSON_REPOSITORY = app.getString("json-repository") |
| JSON_EXTENSION = app.getString("json-extension") |
| JAR_REPOSITORY = app.getString("jar-repository") |
| CHEKPOINT_ENABLED = app.getBoolean("enable-checkpoint") |
| LOG_LEVEL = app.getString("log-level").toUpperCase() |
| LOG_APPENDER = app.getString("log-appender").toUpperCase() |
| MESSAGES_PER_RESIZE = app.getInt("auto-scale.messages-per-resize") |
| DYNAMIC_JAR_REPO = app.getString("dynamic-jar-population.downloaded-repository") |
| DYNAMIC_JAR_FORCE_PULL = app.getBoolean("dynamic-jar-population.force-pull") |
| CUSTOM_DISPATCHERS = app.getValue("custom-dispatchers") |
| MONITORING_ENABLED = app.getBoolean("monitoring.enable") |
| MONITORING_TYPE = app.getString("monitoring.type").toUpperCase() |
| PORT = app.getInt("port") |
| URL_PATH = app.getString("urlPath") |
| |
| setLogbackConfiguration() |
| } |
| |
| def getDispatcherForAkka():Config = { |
| val config = ConfigFactory.parseString("") |
| config.withValue("fey-custom-dispatchers", CUSTOM_DISPATCHERS) |
| } |
| |
| /** |
| * Resets logback context configuration and loads the new one |
| */ |
| def setLogbackConfiguration() : Unit = { |
| val context: LoggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] |
| try { |
| val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger] |
| root.setLevel(getLogLevel) |
| setLogAppenders(root) |
| } catch { |
| case e: Exception => log.error("Could not configure logback",e) |
| } |
| StatusPrinter.printInCaseOfErrorsOrWarnings(context) |
| } |
| |
| def setLogAppenders(root: Logger) : Unit = { |
| LOG_APPENDER match { |
| case "FILE" => |
| root.getAppender(CONSOLE_APPENDER).stop() |
| case "STDOUT" => |
| root.getAppender(FILE_APPENDER).stop() |
| case "FILE_STDOUT" => |
| case x => |
| log.warn(s"Appender $x is not defined. Default to FILE_STDOUT") |
| } |
| } |
| |
| def getLogLevel: Level = { |
| LOG_LEVEL match { |
| case "DEBUG" => Level.DEBUG |
| case "INFO" => Level.INFO |
| case "WARN" => Level.WARN |
| case "ERROR" => Level.ERROR |
| case "TRACE" => Level.TRACE |
| case "ALL" => Level.ALL |
| case "OFF" => Level.OFF |
| case x => |
| log.warn(s"Log level $x is not defined. Default to INFO") |
| Level.INFO |
| } |
| } |
| |
| /** |
| * Loads the specification for validating a Fey JSON |
| */ |
| val JSON_SPEC: SchemaType = { |
| Json.fromJson[SchemaType](Json.parse(scala.io.Source |
| .fromInputStream(getClass.getResourceAsStream("/fey-json-schema-validator.json")) |
| .getLines() |
| .mkString(""))).get |
| } |
| } |
| |
| object GLOBAL_DEFINITIONS { |
| val WATCH_SERVICE_THREAD = "FeyWatchService" |
| } |
| |
| |
| |
| case class NetworkAlreadyDefined(message:String) extends Exception(message) |
| case class IllegalPerformerCreation(message:String) extends Exception(message) |
| case class NetworkNotDefined(message:String) extends Exception(message) |
| case class CommandNotRecognized(message:String) extends Exception(message) |
| case class RestartEnsemble(message:String) extends Exception(message) |