blob: c66be1c8c28d9dfd9c654c97b099c35538ef12b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iota.fey
import java.io.File
import akka.actor.ActorRef
import org.apache.iota.fey.JsonReceiverActor.JSON_RECEIVED
import play.api.libs.json.{JsValue, Json}
import scala.io.Source
/**
* Altough checkpoint processor is not a receiver, it will use the same principle
* as a receiver.
* It will run just once, when the application starts.
* @param receiverActor
*/
class CheckpointProcessor(receiverActor: ActorRef) extends JsonReceiver{
override def run(): Unit = {
processCheckpointFiles()
}
def getJsonObject(params: String): Option[JsValue] = {
try{
val stringJson = Source.fromFile(params).getLines.mkString
Option(Json.parse(stringJson))
}catch{
case e: Exception =>
log.error("Could not parse JSON", e)
None
}
}
private def processJson(path: String, file: File) = {
try{
getJsonObject(path) match {
case Some(orchestrationJSON) =>
val valid = validJson(orchestrationJSON)
if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){
checkForLocation(orchestrationJSON)
}
if(valid) {
receiverActor ! JSON_RECEIVED(orchestrationJSON, file)
}else{
log.warn(s"File $path not processed. Incorrect JSON schema")
}
file.delete()
case None =>
}
} catch {
case e: Exception =>
log.error(s"File $path will not be processed", e)
}
}
private def processCheckpointFiles() = {
Utils.getFilesInDirectory(CONFIG.CHECKPOINT_DIR)
.filter(file => file.getName.endsWith(CONFIG.JSON_EXTENSION))
.foreach(file => {
processJson(file.getAbsolutePath, file)
})
}
override def execute(): Unit = {}
override def exceptionOnRun(e: Exception): Unit = {}
}