blob: 8ef1c7af8d5ce27faed8d0673f95fe412d43e41a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.leader.dsl
import java.util.concurrent.BlockingQueue
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.leader.execution.actions.{Action, ErrorAction, SequentialAction}
import org.apache.amaterasu.leader.execution.JobManager
import org.apache.curator.framework.CuratorFramework
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.Source
/**
* The JobParser class is in charge of parsing the maki.yaml file which
* describes the workflow of an amaterasu job
*/
object JobParser {
def loadMakiFile(): String = {
Source.fromFile("repo/maki.yml").mkString
}
/**
* Parses the maki.yml string and creates a job manager
*
* @param jobId
* @param maki a string containing the YAML definition of the job
* @param actionsQueue
* @param client
* @return
*/
def parse(jobId: String,
maki: String,
actionsQueue: BlockingQueue[ActionData],
client: CuratorFramework,
attempts: Int): JobManager = {
val mapper = new ObjectMapper(new YAMLFactory())
val job = mapper.readTree(maki)
// loading the job details
val manager = JobManager(jobId, job.path("job-name").asText, actionsQueue, client)
// iterating the flow list and constructing the job's flow
val actions = job.path("flow").asInstanceOf[ArrayNode].asScala.toSeq
parseActions(actions, manager, actionsQueue, attempts, null)
manager
}
/**
* parseActions is a recursive function, for building the workflow of
* the job
* God, I miss Clojure
*
* @param actions a seq containing the definitions of all the actions
* @param manager the job manager for the job
* @param actionsQueue
* @param previous the previous action, this is used in order to add the current action
* to the nextActionIds
*/
def parseActions(
actions: Seq[JsonNode],
manager: JobManager,
actionsQueue: BlockingQueue[ActionData],
attempts: Int,
previous: Action
): Unit = {
if (actions.isEmpty)
return
val actionData = actions.head
val action = parseSequentialAction(
actionData,
manager.jobId,
actionsQueue,
manager.client,
attempts
)
//updating the list of frameworks setup
manager.frameworks.getOrElseUpdate(action.data.groupId,
new mutable.HashSet[String]())
.add(action.data.typeId)
if (manager.head == null)
manager.head = action
if (previous != null)
previous.data.nextActionIds.append(action.actionId)
manager.registerAction(action)
val errorNode = actionData.path("error")
if (!errorNode.isMissingNode) {
val errorAction = parseErrorAction(
errorNode,
manager.jobId,
action.data.id,
actionsQueue,
manager.client
)
action.data.errorActionId = errorAction.data.id
manager.registerAction(errorAction)
//updating the list of frameworks setup
manager.frameworks.getOrElseUpdate(errorAction.data.groupId,
new mutable.HashSet[String]())
.add(errorAction.data.typeId)
}
parseActions(actions.tail, manager, actionsQueue, attempts, action)
}
def parseSequentialAction(
action: JsonNode,
jobId: String,
actionsQueue: BlockingQueue[ActionData],
client: CuratorFramework,
attempts: Int
): SequentialAction = {
SequentialAction(
action.path("name").asText,
action.path("file").asText,
action.path("runner").path("group").asText,
action.path("runner").path("type").asText,
action.path("exports").fields().asScala.toSeq.map(e=> (e.getKey, e.getValue.asText())).toMap,
jobId,
actionsQueue,
client,
attempts
)
}
def parseErrorAction(
action: JsonNode,
jobId: String,
parent: String,
actionsQueue: BlockingQueue[ActionData],
client: CuratorFramework
): SequentialAction = {
ErrorAction(
action.path("name").asText,
action.path("file").asText,
parent,
action.path("group").asText,
action.path("type").asText,
jobId,
actionsQueue,
client
)
}
}