blob: 771e61828e0e9dd2a945a5f98266f4d454e26469 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api
import org.apache.logging.log4j.{LogManager, Logger}
import org.apache.wayang.api.serialization.TempFileUtils
import org.apache.wayang.core.api.exception.WayangException
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.reflect.ClassTag
package object async {
val logger: Logger = LogManager.getLogger(getClass)
/**
* Runs the given data quanta asynchronously with a temporary file as the output.
*
* @param dataQuanta The data quanta to be executed.
* @param tempFileOut The path to the temporary output file.
* @tparam Out The type parameter of the data quanta and the output.
* @return A future representing the completion of the execution, which will contain a
* DataQuantaAsyncResult object that holds the path to the temporary output file
* and the class tag for the output type.
*/
def runAsyncWithTempFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], tempFileOut: String): Future[Unit] = {
runAsyncWithObjectFileOut(dataQuanta, tempFileOut)
}
/**
* Runs the given DataQuanta asynchronously and writes the output to a text file.
*
* @param dataQuanta the DataQuanta to be executed
* @param url the URL of the text file to write the output to
* @tparam Out the type of the output DataQuanta
* @return a Future representing the completion of the execution
* @throws WayangException if the WayangContext is not of type MultiContext
*/
def runAsyncWithTextFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], url: String): Future[Unit] = {
// Add sink to multi context and then pass to runAsyncBody
val wayangContext = dataQuanta.planBuilder.wayangContext
wayangContext match {
case context: MultiContext =>
val updatedContext = context.withTextFileSink(url)
runAsyncBody(dataQuanta, updatedContext)
case _ =>
throw new WayangException("WayangContext is not of type MultiContext")
}
}
/**
* Runs the given DataQuanta asynchronously and writes the output to an object file specified by the URL.
*
* @param dataQuanta the DataQuanta to be executed
* @param url the URL of the object file to write the output to
* @tparam Out the type parameter for the output DataQuanta
* @return a Future that represents the execution of the DataQuanta
* @throws WayangException if the WayangContext is not of type MultiContext
*/
def runAsyncWithObjectFileOut[Out: ClassTag](dataQuanta: DataQuanta[Out], url: String): Future[Unit] = {
// Add sink to multi context and then pass to runAsyncBody
val wayangContext = dataQuanta.planBuilder.wayangContext
wayangContext match {
case context: MultiContext =>
val updatedContext = context.withObjectFileSink(url)
runAsyncBody(dataQuanta, updatedContext)
case _ =>
throw new WayangException("WayangContext is not of type MultiContext")
}
}
def runAsyncBody[Out: ClassTag](dataQuanta: DataQuanta[Out], multiContext: MultiContext): Future[Unit] = Future {
import scala.concurrent.blocking
// Write plan builder to temp file
val planBuilderPath = TempFileUtils.writeToTempFileAsString(dataQuanta.planBuilder.withUdfJarsOf(this.getClass))
// Write operator to temp file
val operatorPath = TempFileUtils.writeToTempFileAsString(dataQuanta.operator)
var process: Process = null
try {
val mainClass = "org.apache.wayang.api.async.Main"
val classpath = System.getProperty("java.class.path") // get classpath from parent JVM
// Child process
val processBuilder = new ProcessBuilder(
"java",
"-cp",
classpath,
mainClass,
operatorPath.toString,
planBuilderPath.toString
)
// Redirect children output to parent output
processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT)
// Start child process
process = processBuilder.start()
// And block this future while waiting for it
blocking {
process.waitFor()
}
}
finally {
Files.deleteIfExists(planBuilderPath)
Files.deleteIfExists(operatorPath)
}
}
}