mereged with master
diff --git a/docs/docs/config.md b/docs/docs/config.md
index f49186a..52adfc2 100644
--- a/docs/docs/config.md
+++ b/docs/docs/config.md
@@ -65,6 +65,7 @@
file: start.py
```
+
## Configuration Types
Amaterasu allows the configuration of three main areas:
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 7a32b9e..e3e503f 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -56,8 +56,8 @@
override val groupResources: Array<File> by lazy {
when (conf.mode()) {
- "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runner-${conf.version()}-all.jar"), File("spark-runtime-${conf.version()}.jar"))
- "yarn" -> arrayOf(File("spark-runner-${conf.version()}-all.jar"), File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
+ "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runtime-${conf.version()}.jar"))
+ "yarn" -> arrayOf(File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
else -> arrayOf()
}
}
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
index d716427..c4a4f93 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
@@ -31,8 +31,8 @@
val hive = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
- return "$command && \$SPARK_HOME/bin/spark-submit $master" +
- SparkCommandLineHelper.getOptions(sparkOptions) +
+ return "$command && \$SPARK_HOME/bin/spark-submit $master " +
+ SparkCommandLineHelper.getOptions(sparkOptions) + " " +
SparkCommandLineHelper.getProperties(sparkProperties) + " " +
"--conf spark.pyspark.python=${conf.pythonPath()} " +
"--conf spark.pyspark.driver.python=$virtualPythonPath " +
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig
new file mode 100644
index 0000000..daaab40
--- /dev/null
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import com.uchuhimo.konf.Config
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
+import org.apache.amaterasu.common.configuration.Job
+
+class PySparkRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) {
+
+ override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
+ val sparkProperties: Map<String, Any> = env["sparkProperties"]
+ val sparkOptions: Map<String, Any> = env["sparkOptions"]
+ val command = super.getCommand(jobId, actionData, env, executorId, callbackAddress)
+ val hive = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
+ val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
+
+<<<<<<< HEAD
+ return "$command && \$SPARK_HOME/bin/spark-submit $master " +
+ SparkCommandLineHelper.getOptions(sparkOptions) + " " +
+=======
+ return "$command && \$SPARK_HOME/bin/spark-submit $master" +
+ SparkCommandLineHelper.getOptions(sparkOptions) +
+>>>>>>> upstream/master
+ SparkCommandLineHelper.getProperties(sparkProperties) + " " +
+ "--conf spark.pyspark.python=${conf.pythonPath()} " +
+ "--conf spark.pyspark.driver.python=$virtualPythonPath " +
+ hive +
+ " ${actionData.src}"
+ }
+
+ override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
+
+ override val runnerResources: Array<String>
+ get() {
+ var resources = super.runnerResources
+ resources += "amaterasu_pyspark-${conf.version()}.zip"
+ //log.info("PYSPARK RESOURCES ==> ${resources.toSet}")
+ return resources
+ }
+
+ override val hasExecutor: Boolean = false
+}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
index e60eb11..ccfa6f0 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
@@ -19,10 +19,10 @@
object SparkCommandLineHelper {
fun getOptions(sparkOptions: Map<String, Any>): String {
- return sparkOptions.map { "--${it.key} ${it.value}" }.joinToString( separator = " ")
+ return sparkOptions.map { "--${it.key} ${it.value}" }.joinToString( separator = " ") + " "
}
fun getProperties(sparkProps: Map<String, Any>): String{
- return sparkProps.map { "--conf $it" }.joinToString( separator = " ")
+ return sparkProps.map { "--conf $it" }.joinToString( separator = " ") + " "
}
}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index a607d75..f413fd1 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -20,7 +20,6 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.utils.ArtifactUtil
-import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.common.configuration.Job
import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
@@ -35,8 +34,8 @@
val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
return "\$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.artifact).first().name} " +
- SparkCommandLineHelper.getOptions(sparkOptions) +
- SparkCommandLineHelper.getProperties(sparkProperties) +
+ SparkCommandLineHelper.getOptions(sparkOptions) + " " +
+ SparkCommandLineHelper.getProperties(sparkProperties) + " " +
master +
" --jars spark-runtime-${conf.version()}.jar >&1"
}
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
deleted file mode 100644
index b363058..0000000
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/build.gradle b/frameworks/spark/runner/build.gradle
deleted file mode 100644
index 3a612f2..0000000
--- a/frameworks/spark/runner/build.gradle
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
- id 'com.github.johnrengelman.shadow' version '1.2.4'
- id 'com.github.maiflai.scalatest' version '0.22'
- id 'scala'
- id 'java'
-}
-
-shadowJar {
- zip64 true
-}
-
-repositories {
- maven {
- url "https://plugins.gradle.org/m2/"
- }
- mavenCentral()
-}
-
-test {
- maxParallelForks = 1
- forkEvery = 1
-}
-
-configurations {
- provided
-}
-
-sourceSets {
- main.compileClasspath += configurations.provided
- test.compileClasspath += configurations.provided
- test.runtimeClasspath += configurations.provided
-}
-
-dependencies {
-
- compile project(':executor')
- compile project(':spark-runtime')
- compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
- compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
- compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
- compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
- compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
- compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
- compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5'
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5'
- compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
-
- compile('com.jcabi:jcabi-aether:0.10.1') {
- exclude group: 'org.jboss.netty'
- }
- compile('org.apache.activemq:activemq-client:5.15.2') {
- exclude group: 'org.jboss.netty'
- }
-
- //runtime dependency for spark
- provided('org.apache.spark:spark-repl_2.11:2.2.1')
- provided('org.apache.spark:spark-core_2.11:2.2.1')
-
- testCompile project(':common')
- testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
- testRuntime 'org.pegdown:pegdown:1.1.0'
- testCompile 'junit:junit:4.11'
- testCompile 'org.scalatest:scalatest_2.11:3.0.2'
- testCompile 'org.scala-lang:scala-library:2.11.8'
- testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
- testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
-
-}
-
-sourceSets {
- test {
- resources.srcDirs += [file('src/test/resources')]
- }
-
- main {
- scala {
- srcDirs = ['src/main/scala', 'src/main/java']
- }
- java {
- srcDirs = []
- }
- }
-}
-
-test {
-
- maxParallelForks = 1
-}
-
-task copyToHome(type: Copy) {
- dependsOn shadowJar
- from 'build/libs'
- into '../../../build/amaterasu/dist'
- from 'build/resources/main'
- into '../../../build/amaterasu/dist'
-}
diff --git a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkEntryPoint.java b/frameworks/spark/runner/src/main/java/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkEntryPoint.java
deleted file mode 100755
index 491d771..0000000
--- a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkEntryPoint.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark;
-
-import org.apache.amaterasu.common.runtime.Environment;
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SparkSession;
-import py4j.GatewayServer;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PySparkEntryPoint {
-
- //private static Boolean Started = false;
- private static PySparkExecutionQueue queue = new PySparkExecutionQueue();
- private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>();
-
- private static int port = 0;
- private static SparkSession sparkSession = null;
- private static JavaSparkContext jsc = null;
- private static SQLContext sqlContext = null;
- private static SparkEnv sparkEnv = null;
-
- public static PySparkExecutionQueue getExecutionQueue() {
- return queue;
- }
-
- public static ResultQueue getResultQueue(String actionName) {
- resultQueues.putIfAbsent(actionName, new ResultQueue());
- return resultQueues.get(actionName);
- }
-
- public static JavaSparkContext getJavaSparkContext() {
- SparkEnv.set(sparkEnv);
- return jsc;
- }
-
- public static String getJobId(){
- return AmaContext.jobId();
- }
-
- public static Environment getEnv(){
- return AmaContext.env();
- }
-
- public static SQLContext getSqlContext() {
- SparkEnv.set(sparkEnv);
- return sqlContext;
- }
-
- public static SparkSession getSparkSession() {
- SparkEnv.set(sparkEnv);
- return sparkSession;
- }
-
- public static SparkConf getSparkConf() {
- return jsc.getConf();
- }
-
- private static void generatePort() {
-
- try {
-
- ServerSocket socket = new ServerSocket(0);
- port = socket.getLocalPort();
-
- socket.close();
-
- } catch (IOException e) {
- }
-
- }
-
- public static int getPort() {
- return port;
- }
-
- public static void start(SparkSession spark,
- String jobName,
- Environment env,
- SparkEnv sparkEnv) {
-
- AmaContext.init(spark, jobName, env);
-
- sparkSession = spark;
- jsc = new JavaSparkContext(spark.sparkContext());
- sqlContext = spark.sqlContext();
- PySparkEntryPoint.sparkEnv = sparkEnv;
- generatePort();
- GatewayServer gatewayServer = new GatewayServer(new PySparkEntryPoint(), port);
-
- gatewayServer.start();
- }
-}
diff --git a/frameworks/spark/runner/src/main/resources/codegen.py b/frameworks/spark/runner/src/main/resources/codegen.py
deleted file mode 100644
index 113d9be..0000000
--- a/frameworks/spark/runner/src/main/resources/codegen.py
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
- codegen
- ~~~~~~~
-
- Extension to ast that allow ast -> python code generation.
-
- :copyright: Copyright 2008 by Armin Ronacher.
- :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
- """This function can convert a node tree back into python sourcecode.
- This is useful for debugging purposes, especially if you're dealing with
- custom asts not generated by python itself.
-
- It could be that the sourcecode is evaluable when the AST itself is not
- compilable / evaluable. The reason for this is that the AST contains some
- more data than regular sourcecode does, which is dropped during
- conversion.
-
- Each level of indentation is replaced with `indent_with`. Per default this
- parameter is equal to four spaces as suggested by PEP 8, but it might be
- adjusted to match the application's styleguide.
-
- If `add_line_information` is set to `True` comments for the line numbers
- of the nodes are added to the output. This can be used to spot wrong line
- number information of statement nodes.
- """
- generator = SourceGenerator(indent_with, add_line_information)
- generator.visit(node)
-
- return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
- """This visitor is able to transform a well formed syntax tree into python
- sourcecode. For more details have a look at the docstring of the
- `node_to_source` function.
- """
-
- def __init__(self, indent_with, add_line_information=False):
- self.result = []
- self.indent_with = indent_with
- self.add_line_information = add_line_information
- self.indentation = 0
- self.new_lines = 0
-
- def write(self, x):
- if self.new_lines:
- if self.result:
- self.result.append('\n' * self.new_lines)
- self.result.append(self.indent_with * self.indentation)
- self.new_lines = 0
- self.result.append(x)
-
- def newline(self, node=None, extra=0):
- self.new_lines = max(self.new_lines, 1 + extra)
- if node is not None and self.add_line_information:
- self.write('# line: %s' % node.lineno)
- self.new_lines = 1
-
- def body(self, statements):
- self.new_line = True
- self.indentation += 1
- for stmt in statements:
- self.visit(stmt)
- self.indentation -= 1
-
- def body_or_else(self, node):
- self.body(node.body)
- if node.orelse:
- self.newline()
- self.write('else:')
- self.body(node.orelse)
-
- def signature(self, node):
- want_comma = []
- def write_comma():
- if want_comma:
- self.write(', ')
- else:
- want_comma.append(True)
-
- padding = [None] * (len(node.args) - len(node.defaults))
- for arg, default in zip(node.args, padding + node.defaults):
- write_comma()
- self.visit(arg)
- if default is not None:
- self.write('=')
- self.visit(default)
- if node.vararg is not None:
- write_comma()
- self.write('*' + node.vararg)
- if node.kwarg is not None:
- write_comma()
- self.write('**' + node.kwarg)
-
- def decorators(self, node):
- for decorator in node.decorator_list:
- self.newline(decorator)
- self.write('@')
- self.visit(decorator)
-
- # Statements
-
- def visit_Assert(self, node):
- self.newline(node)
- self.write('assert ')
- self.visit(node.test)
- if node.msg is not None:
- self.write(', ')
- self.visit(node.msg)
-
- def visit_Assign(self, node):
- self.newline(node)
- for idx, target in enumerate(node.targets):
- if idx:
- self.write(', ')
- self.visit(target)
- self.write(' = ')
- self.visit(node.value)
-
- def visit_AugAssign(self, node):
- self.newline(node)
- self.visit(node.target)
- self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
- self.visit(node.value)
-
- def visit_ImportFrom(self, node):
- self.newline(node)
- self.write('from %s%s import ' % ('.' * node.level, node.module))
- for idx, item in enumerate(node.names):
- if idx:
- self.write(', ')
- self.write(item)
-
- def visit_Import(self, node):
- self.newline(node)
- for item in node.names:
- self.write('import ')
- self.visit(item)
-
- def visit_Expr(self, node):
- self.newline(node)
- self.generic_visit(node)
-
- def visit_FunctionDef(self, node):
- self.newline(extra=1)
- self.decorators(node)
- self.newline(node)
- self.write('def %s(' % node.name)
- self.visit(node.args)
- self.write('):')
- self.body(node.body)
-
- def visit_ClassDef(self, node):
- have_args = []
- def paren_or_comma():
- if have_args:
- self.write(', ')
- else:
- have_args.append(True)
- self.write('(')
-
- self.newline(extra=2)
- self.decorators(node)
- self.newline(node)
- self.write('class %s' % node.name)
- for base in node.bases:
- paren_or_comma()
- self.visit(base)
- # XXX: the if here is used to keep this module compatible
- # with python 2.6.
- if hasattr(node, 'keywords'):
- for keyword in node.keywords:
- paren_or_comma()
- self.write(keyword.arg + '=')
- self.visit(keyword.value)
- if node.starargs is not None:
- paren_or_comma()
- self.write('*')
- self.visit(node.starargs)
- if node.kwargs is not None:
- paren_or_comma()
- self.write('**')
- self.visit(node.kwargs)
- self.write(have_args and '):' or ':')
- self.body(node.body)
-
- def visit_If(self, node):
- self.newline(node)
- self.write('if ')
- self.visit(node.test)
- self.write(':')
- self.body(node.body)
- while True:
- else_ = node.orelse
- if len(else_) == 0:
- break
- elif len(else_) == 1 and isinstance(else_[0], If):
- node = else_[0]
- self.newline()
- self.write('elif ')
- self.visit(node.test)
- self.write(':')
- self.body(node.body)
- else:
- self.newline()
- self.write('else:')
- self.body(else_)
- break
-
- def visit_For(self, node):
- self.newline(node)
- self.write('for ')
- self.visit(node.target)
- self.write(' in ')
- self.visit(node.iter)
- self.write(':')
- self.body_or_else(node)
-
- def visit_While(self, node):
- self.newline(node)
- self.write('while ')
- self.visit(node.test)
- self.write(':')
- self.body_or_else(node)
-
- def visit_With(self, node):
- self.newline(node)
- self.write('with ')
- self.visit(node.context_expr)
- if node.optional_vars is not None:
- self.write(' as ')
- self.visit(node.optional_vars)
- self.write(':')
- self.body(node.body)
-
- def visit_Pass(self, node):
- self.newline(node)
- self.write('pass')
-
- def visit_Print(self, node):
- # XXX: python 2.6 only
- self.newline(node)
- self.write('print ')
- want_comma = False
- if node.dest is not None:
- self.write(' >> ')
- self.visit(node.dest)
- want_comma = True
- for value in node.values:
- if want_comma:
- self.write(', ')
- self.visit(value)
- want_comma = True
- if not node.nl:
- self.write(',')
-
- def visit_Delete(self, node):
- self.newline(node)
- self.write('del ')
- for idx, target in enumerate(node):
- if idx:
- self.write(', ')
- self.visit(target)
-
- def visit_TryExcept(self, node):
- self.newline(node)
- self.write('try:')
- self.body(node.body)
- for handler in node.handlers:
- self.visit(handler)
-
- def visit_TryFinally(self, node):
- self.newline(node)
- self.write('try:')
- self.body(node.body)
- self.newline(node)
- self.write('finally:')
- self.body(node.finalbody)
-
- def visit_Global(self, node):
- self.newline(node)
- self.write('global ' + ', '.join(node.names))
-
- def visit_Nonlocal(self, node):
- self.newline(node)
- self.write('nonlocal ' + ', '.join(node.names))
-
- def visit_Return(self, node):
- self.newline(node)
- if node.value is None:
- self.write('return')
- else:
- self.write('return ')
- self.visit(node.value)
-
- def visit_Break(self, node):
- self.newline(node)
- self.write('break')
-
- def visit_Continue(self, node):
- self.newline(node)
- self.write('continue')
-
- def visit_Raise(self, node):
- # XXX: Python 2.6 / 3.0 compatibility
- self.newline(node)
- self.write('raise')
- if hasattr(node, 'exc') and node.exc is not None:
- self.write(' ')
- self.visit(node.exc)
- if node.cause is not None:
- self.write(' from ')
- self.visit(node.cause)
- elif hasattr(node, 'type') and node.type is not None:
- self.visit(node.type)
- if node.inst is not None:
- self.write(', ')
- self.visit(node.inst)
- if node.tback is not None:
- self.write(', ')
- self.visit(node.tback)
-
- # Expressions
-
- def visit_Attribute(self, node):
- self.visit(node.value)
- self.write('.' + node.attr)
-
- def visit_Call(self, node):
- want_comma = []
- def write_comma():
- if want_comma:
- self.write(', ')
- else:
- want_comma.append(True)
-
- self.visit(node.func)
- self.write('(')
- for arg in node.args:
- write_comma()
- self.visit(arg)
- for keyword in node.keywords:
- write_comma()
- self.write(keyword.arg + '=')
- self.visit(keyword.value)
- if node.starargs is not None:
- write_comma()
- self.write('*')
- self.visit(node.starargs)
- if node.kwargs is not None:
- write_comma()
- self.write('**')
- self.visit(node.kwargs)
- self.write(')')
-
- def visit_Name(self, node):
- self.write(node.id)
-
- def visit_Str(self, node):
- self.write(repr(node.s))
-
- def visit_Bytes(self, node):
- self.write(repr(node.s))
-
- def visit_Num(self, node):
- self.write(repr(node.n))
-
- def visit_Tuple(self, node):
- self.write('(')
- idx = -1
- for idx, item in enumerate(node.elts):
- if idx:
- self.write(', ')
- self.visit(item)
- self.write(idx and ')' or ',)')
-
- def sequence_visit(left, right):
- def visit(self, node):
- self.write(left)
- for idx, item in enumerate(node.elts):
- if idx:
- self.write(', ')
- self.visit(item)
- self.write(right)
- return visit
-
- visit_List = sequence_visit('[', ']')
- visit_Set = sequence_visit('{', '}')
- del sequence_visit
-
- def visit_Dict(self, node):
- self.write('{')
- for idx, (key, value) in enumerate(zip(node.keys, node.values)):
- if idx:
- self.write(', ')
- self.visit(key)
- self.write(': ')
- self.visit(value)
- self.write('}')
-
- def visit_BinOp(self, node):
- self.visit(node.left)
- self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
- self.visit(node.right)
-
- def visit_BoolOp(self, node):
- self.write('(')
- for idx, value in enumerate(node.values):
- if idx:
- self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
- self.visit(value)
- self.write(')')
-
- def visit_Compare(self, node):
- self.write('(')
- self.visit(node.left)
- for op, right in zip(node.ops, node.comparators):
- self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
- self.visit(right)
- self.write(')')
-
- def visit_UnaryOp(self, node):
- self.write('(')
- op = UNARYOP_SYMBOLS[type(node.op)]
- self.write(op)
- if op == 'not':
- self.write(' ')
- self.visit(node.operand)
- self.write(')')
-
- def visit_Subscript(self, node):
- self.visit(node.value)
- self.write('[')
- self.visit(node.slice)
- self.write(']')
-
- def visit_Slice(self, node):
- if node.lower is not None:
- self.visit(node.lower)
- self.write(':')
- if node.upper is not None:
- self.visit(node.upper)
- if node.step is not None:
- self.write(':')
- if not (isinstance(node.step, Name) and node.step.id == 'None'):
- self.visit(node.step)
-
- def visit_ExtSlice(self, node):
- for idx, item in node.dims:
- if idx:
- self.write(', ')
- self.visit(item)
-
- def visit_Yield(self, node):
- self.write('yield ')
- self.visit(node.value)
-
- def visit_Lambda(self, node):
- self.write('lambda ')
- self.visit(node.args)
- self.write(': ')
- self.visit(node.body)
-
- def visit_Ellipsis(self, node):
- self.write('Ellipsis')
-
- def generator_visit(left, right):
- def visit(self, node):
- self.write(left)
- self.visit(node.elt)
- for comprehension in node.generators:
- self.visit(comprehension)
- self.write(right)
- return visit
-
- visit_ListComp = generator_visit('[', ']')
- visit_GeneratorExp = generator_visit('(', ')')
- visit_SetComp = generator_visit('{', '}')
- del generator_visit
-
- def visit_DictComp(self, node):
- self.write('{')
- self.visit(node.key)
- self.write(': ')
- self.visit(node.value)
- for comprehension in node.generators:
- self.visit(comprehension)
- self.write('}')
-
- def visit_IfExp(self, node):
- self.visit(node.body)
- self.write(' if ')
- self.visit(node.test)
- self.write(' else ')
- self.visit(node.orelse)
-
- def visit_Starred(self, node):
- self.write('*')
- self.visit(node.value)
-
- def visit_Repr(self, node):
- # XXX: python 2.6 only
- self.write('`')
- self.visit(node.value)
- self.write('`')
-
- # Helper Nodes
-
- def visit_alias(self, node):
- self.write(node.name)
- if node.asname is not None:
- self.write(' as ' + node.asname)
-
- def visit_comprehension(self, node):
- self.write(' for ')
- self.visit(node.target)
- self.write(' in ')
- self.visit(node.iter)
- if node.ifs:
- for if_ in node.ifs:
- self.write(' if ')
- self.visit(if_)
-
- def visit_excepthandler(self, node):
- self.newline(node)
- self.write('except')
- if node.type is not None:
- self.write(' ')
- self.visit(node.type)
- if node.name is not None:
- self.write(' as ')
- self.visit(node.name)
- self.write(':')
- self.body(node.body)
-
- def visit_arguments(self, node):
- self.signature(node)
diff --git a/frameworks/spark/runner/src/main/resources/runtime.py b/frameworks/spark/runner/src/main/resources/runtime.py
deleted file mode 100644
index d01664c..0000000
--- a/frameworks/spark/runner/src/main/resources/runtime.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
- def __init__(self, sc, spark, job_id, env):
- self.sc = sc
- self.spark = spark
- self.job_id = job_id
- self.env = env
-
- def get_dataframe(self, action_name, dataset_name, format = "parquet"):
- return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
- def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
- self.name = name
- self.master = master
- self.input_root_path = input_root_path
- self.output_root_path = output_root_path
- self.working_dir = working_dir
- self.configuration = configuration
diff --git a/frameworks/spark/runner/src/main/resources/spark-version-info.properties b/frameworks/spark/runner/src/main/resources/spark-version-info.properties
deleted file mode 100644
index ce0b312..0000000
--- a/frameworks/spark/runner/src/main/resources/spark-version-info.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-version=2.1.0-SNAPSHOT
-
-user=root
-
-revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe
-
-branch=master
-
-date=2016-07-27T11:23:21Z
-
-url=https://github.com/apache/spark.git
diff --git a/frameworks/spark/runner/src/main/resources/spark_intp.py b/frameworks/spark/runner/src/main/resources/spark_intp.py
deleted file mode 100755
index f3c9fc0..0000000
--- a/frameworks/spark/runner/src/main/resources/spark_intp.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-sys.path.append(os.getcwd())
-from runtime import AmaContext, Environment
-
-# os.chdir(os.getcwd() + '/build/resources/test/')
-# import zipfile
-# zip = zipfile.ZipFile('pyspark.zip')
-# zip.extractall()
-# zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-# zip.extractall()
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
- actionData = queue.getNext()
- resultQueue = entry_point.getResultQueue(actionData._2())
- actionSource = actionData._1()
- tree = ast.parse(actionSource)
- exports = actionData._3()
-
- for node in tree.body:
-
- wrapper = ast.Module(body=[node])
- try:
- co = compile(wrapper, "<ast>", 'exec')
- exec (co)
- resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
- #if this node is an assignment, we need to check if it needs to be persisted
- try:
- persistCode = ''
- if(isinstance(node,ast.Assign)):
- varName = node.targets[0].id
- if(exports.containsKey(varName)):
- persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
- persist = compile(persistCode, '<stdin>', 'exec')
- exec(persist)
-
- except:
- resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
- except:
- resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
- resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
deleted file mode 100644
index 06dfa0d..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner
-
-import java.io._
-
-import com.jcabi.aether.Aether
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunner
-import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunner
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.eclipse.aether.util.artifact.JavaScopes
-import org.sonatype.aether.repository.RemoteRepository
-import org.sonatype.aether.util.artifact.DefaultArtifact
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.sys.process._
-
-class SparkRunnersProvider extends Logging with RunnersProvider {
-
- private val runners = new TrieMap[String, AmaterasuRunner]
- private var shellLoger = ProcessLogger(
- (o: String) => log.info(o),
- (e: String) => log.error(e)
-
- )
- private var conf: Option[Map[String, AnyRef]] = None
- private var executorEnv: Option[Map[String, AnyRef]] = None
- private var clusterConfig: ClusterConfig = _
-
- override def init(execData: ExecData,
- jobId: String,
- outStream: ByteArrayOutputStream,
- notifier: Notifier,
- executorId: String,
- config: ClusterConfig,
- hostName: String): Unit = {
-
- shellLoger = ProcessLogger(
- (o: String) => log.info(o),
- (e: String) => log.error("", e)
- )
- clusterConfig = config
- var jars = Seq.empty[String]
-
- if (execData.getDeps != null) {
- jars ++= getDependencies(execData.getDeps)
- }
-
- if (execData.getPyDeps != null &&
- execData.getPyDeps.getFilePaths.nonEmpty) {
- loadPythonDependencies(execData.getPyDeps, notifier)
- }
-
- if(execData.getConfigurations.containsKey("spark")){
- conf = Some(execData.getConfigurations.get("spark").toMap)
- }
-
- if (execData.getConfigurations.containsKey("spark_exec_env")) {
- executorEnv = Some( execData.getConfigurations.get("spark_exec_env").toMap)
- }
- val sparkAppName = s"job_${jobId}_executor_$executorId"
-
- SparkRunnerHelper.notifier = notifier
- val spark = SparkRunnerHelper.createSpark(execData.getEnv, sparkAppName, jars, conf, executorEnv, config, hostName)
-
- lazy val sparkScalaRunner = SparkScalaRunner(execData.getEnv, jobId, spark, outStream, notifier, jars)
- sparkScalaRunner.initializeAmaContext(execData.getEnv)
-
- runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
- var pypath = ""
- // TODO: get rid of hard-coded version
- config.mode match {
- case "yarn" =>
- pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
- case "mesos" =>
- pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
- }
- lazy val pySparkRunner = PySparkRunner(execData.getEnv, jobId, notifier, spark, pypath, execData.getPyDeps, config)
- runners.put(pySparkRunner.getIdentifier, pySparkRunner)
-
- lazy val sparkSqlRunner = SparkSqlRunner(execData.getEnv, jobId, notifier, spark)
- runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
- }
-
- private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
- val channel = pythonPackage.getChannel
- if (channel == "anaconda") {
- Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.getPackageId}") ! shellLoger
- } else {
- Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.getPackageId}") ! shellLoger
- }
- }
-
- private def installAnacondaOnNode(): Unit = {
- // TODO: get rid of hard-coded version
-
- this.clusterConfig.mode match {
- case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
- case "mesos" => Seq("sh", "miniconda.sh", "-b", "-p", "miniconda") ! shellLoger
- }
-
- Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
- Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
- }
-
- private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
- notifier.info("loading anaconda evn")
- installAnacondaOnNode()
- val codegenPackage = new PythonPackage("codegen", "", "auto")
- installAnacondaPackage(codegenPackage)
-// try {
-// // notifier.info("loadPythonDependencies #5")
-// deps.getFilePaths.foreach(pack => {
-// pack.toLowerCase match {
-// case "anaconda" => installAnacondaPackage(pack)
-// // case "pypi" => installPyPiPackage(pack)
-// }
-// })
-// }
-// catch {
-//
-// case rte: RuntimeException =>
-// val sw = new StringWriter
-// rte.printStackTrace(new PrintWriter(sw))
-// notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-// case e: Exception =>
-// val sw = new StringWriter
-// e.printStackTrace(new PrintWriter(sw))
-// notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-// }
- }
-
- override def getGroupIdentifier: String = "spark"
-
- override def getRunner(id: String): AmaterasuRunner = runners(id)
-
- private def getDependencies(deps: Dependencies): Seq[String] = {
-
- // adding a local repo because Aether needs one
- val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
-
- val remotes = deps.getRepos.map(r =>
- new RemoteRepository(
- r.getId,
- r.getType,
- r.getUrl
- )).toList.asJava
-
- val aether = new Aether(remotes, repo)
-
- deps.getArtifacts.flatMap(a => {
- aether.resolve(
- new DefaultArtifact(a.getGroupId, a.getArtifactId, "", "jar", a.getVersion),
- JavaScopes.RUNTIME
- ).map(a => a)
- }).map(x => x.getFile.getAbsolutePath)
-
- }
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkExecutionQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkExecutionQueue.scala
deleted file mode 100755
index 4a7c3a2..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkExecutionQueue.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.util
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class PySparkExecutionQueue {
-
- val queue = new LinkedBlockingQueue[(String, String, util.Map[String, String])]()
-
- def getNext(): (String, String, util.Map[String, String]) = {
-
- // if the queue is idle for an hour it will return null which
- // terminates the python execution, need to revisit
- queue.poll(1, TimeUnit.HOURS)
-
- }
-
- def setForExec(line: (String, String, util.Map[String, String])) = {
-
- queue.put(line)
-
- }
-
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkResultQueue.scala
deleted file mode 100755
index 1b17a81..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkResultQueue.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import org.apache.amaterasu.frameworks.spark.runner.pyspark.ResultType.ResultType
-
-object ResultType extends Enumeration {
- type ResultType = Value
- val success = Value("success")
- val error = Value("error")
- val completion = Value("completion")
-}
-
-case class PySparkResult(
- resultType: ResultType,
- action: String,
- statement: String,
- message: String
-)
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
deleted file mode 100644
index 24dfc8f..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.commons.lang.StringUtils
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.SparkSession
-
-import scala.sys.process.{Process, ProcessLogger}
-
-
-
-
-class PySparkRunner extends Logging with AmaterasuRunner {
-
- var proc: Process = _
- var notifier: Notifier = _
-
- override def getIdentifier: String = "pyspark"
-
- override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
- interpretSources(actionSource, actionName, exports)
- }
-
- def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
- PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
- val resQueue = PySparkEntryPoint.getResultQueue(actionName)
-
- notifier.info(s"================= Started action $actionName =================")
-
- var res: PySparkResult = null
-
- do {
- res = resQueue.getNext()
- res.resultType match {
- case ResultType.success =>
- notifier.success(res.statement)
- case ResultType.error =>
- notifier.error(res.statement, res.message)
- throw new Exception(res.message)
- case ResultType.completion =>
- notifier.info(s"================= finished action $actionName =================")
- }
- } while (res != null && res.resultType != ResultType.completion)
- }
-
-}
-
-object PySparkRunner {
-
- def collectCondaPackages(): String = {
- val pkgsDirs = new File("./miniconda/pkgs")
- (pkgsDirs.listFiles.filter {
- file => file.getName.endsWith(".tar.bz2")
- }.map {
- file => s"./miniconda/pkgs/${file.getName}"
- }.toBuffer ++ "dist/codegen.py").mkString(",")
- }
-
- def apply(env: Environment,
- jobId: String,
- notifier: Notifier,
- spark: SparkSession,
- pypath: String,
- pyDeps: PythonDependencies,
- config: ClusterConfig): PySparkRunner = {
-
- val shellLoger = ProcessLogger(
- (o: String) => println(o),
- (e: String) => println(e)
- )
-
- //TODO: can we make this less ugly?
-
-
- val result = new PySparkRunner
-
- PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
- val port = PySparkEntryPoint.getPort
- var intpPath = ""
- if (env.getConfiguration.containsKey("cwd")) {
- val cwd = new File(env.getConfiguration.get("cwd").toString)
- intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
- } else {
- intpPath = s"spark_intp.py"
- }
- var pysparkPath = ""
- var condaPkgs = ""
- if (pyDeps != null)
- condaPkgs = collectCondaPackages()
- var sparkCmd: Seq[String] = Seq()
- config.mode match {
- case "yarn" =>
- pysparkPath = s"${StringUtils.stripStart(config.spark.home,"/")}/bin/spark-submit"
- sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
- val proc = Process(sparkCmd, None,
- "PYTHONPATH" -> pypath,
- "PYTHONHASHSEED" -> 0.toString)
-
- proc.run(shellLoger)
- case "mesos" =>
- pysparkPath = config.pysparkPath
- if (pysparkPath.endsWith("spark-submit")) {
- sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
- }
- else {
- sparkCmd = Seq(pysparkPath, intpPath, port.toString)
- }
- var pysparkPython = "/usr/bin/python"
-
- val proc = Process(sparkCmd, None,
- "PYTHONPATH" -> pypath,
- "PYSPARK_PYTHON" -> pysparkPython,
- "PYTHONHASHSEED" -> 0.toString)
-
- proc.run(shellLoger)
- }
-
- result.notifier = notifier
-
- result
- }
-
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/ResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/ResultQueue.scala
deleted file mode 100755
index daca0fc..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/ResultQueue.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class ResultQueue {
- val queue = new LinkedBlockingQueue[PySparkResult]()
-
- def getNext(): PySparkResult = {
-
- // if the queue is idle for an hour it will return null which
- // terminates the python execution, need to revisit
- queue.poll(10, TimeUnit.MINUTES)
-
- }
-
- def put(
- resultType: String,
- action: String,
- statement: String,
- message: String
- ) = {
-
- val result = new PySparkResult(ResultType.withName(resultType), action, statement, message)
- queue.put(result)
- }
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/AmaSparkILoop.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/AmaSparkILoop.scala
deleted file mode 100755
index 1f803dc..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/AmaSparkILoop.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import java.io.PrintWriter
-
-import org.apache.spark.repl.SparkILoop
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.IMain
-
-class AmaSparkILoop(writer: PrintWriter) extends SparkILoop(None, writer) {
-
- def create = {
- this.createInterpreter
- }
-
- def setSettings(settings: Settings) = {
- this.settings = settings
- }
-
- def getIntp: IMain = {
- this.intp
- }
-
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
deleted file mode 100644
index 65342eb..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import java.io.{ByteArrayOutputStream, File, PrintWriter}
-import java.nio.file.{Files, Paths}
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.common.utils.FileUtils
-import org.apache.commons.lang.StringUtils
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-
-import scala.tools.nsc.GenericRunnerSettings
-import scala.tools.nsc.interpreter.IMain
-
-object SparkRunnerHelper extends Logging {
-
- private val conf = new SparkConf()
- private val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
- private val outputDir = Files.createTempDirectory(Paths.get(rootDir), "repl").toFile
- outputDir.deleteOnExit()
-
- private var sparkSession: SparkSession = _
-
- var notifier: Notifier = _
-
- private var interpreter: IMain = _
-
- def getNode: String = sys.env.get("AMA_NODE") match {
- case None => "127.0.0.1"
- case _ => sys.env("AMA_NODE")
- }
-
- def getOrCreateScalaInterperter(outStream: ByteArrayOutputStream, jars: Seq[String], recreate: Boolean = false): IMain = {
- if (interpreter == null || recreate) {
- initInterpreter(outStream, jars)
- }
- interpreter
- }
-
- private def scalaOptionError(msg: String): Unit = {
- notifier.error("", msg)
- }
-
- private def initInterpreter(outStream: ByteArrayOutputStream, jars: Seq[String]): Unit = {
-
- var result: IMain = null
- val config = new ClusterConfig()
- try {
-
- val interpArguments = List(
- "-Yrepl-class-based",
- "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
- "-classpath", jars.mkString(File.separator)
- )
-
- val settings = new GenericRunnerSettings(scalaOptionError)
- settings.processArguments(interpArguments, processAll = true)
-
- settings.classpath.append(System.getProperty("java.class.path") + java.io.File.pathSeparator +
- "spark-" + config.webserver.sparkVersion + "/jars/*" + java.io.File.pathSeparator +
- jars.mkString(java.io.File.pathSeparator))
-
- settings.usejavacp.value = true
-
- val out = new PrintWriter(outStream)
- val interpreter = new AmaSparkILoop(out)
- interpreter.setSettings(settings)
-
- interpreter.create
-
- val intp = interpreter.getIntp
-
- settings.embeddedDefaults(Thread.currentThread().getContextClassLoader)
- intp.setContextClassLoader
- intp.initializeSynchronous
-
- result = intp
- }
- catch {
- case e: Exception =>
- println(new Predef.String(outStream.toByteArray))
-
- }
-
- interpreter = result
- }
-
-
- def createSpark(env: Environment,
- sparkAppName: String,
- jars: Seq[String],
- sparkConf: Option[Map[String, Any]],
- executorEnv: Option[Map[String, Any]],
- config: ClusterConfig,
- hostName: String): SparkSession = {
-
- Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
- val minicondaPkgsPath = "miniconda/pkgs"
- val executorMinicondaDirRef = new File(minicondaPkgsPath)
- val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0)
- val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") ||
- f.getName.endsWith(".egg") ||
- f.getName.endsWith(".zip"))
-
- conf.setAppName(sparkAppName)
- .set("spark.driver.host", hostName)
- .set("spark.submit.deployMode", "client")
- .set("spark.hadoop.validateOutputSpecs", "false")
- .set("spark.logConf", "true")
- .set("spark.submit.pyFiles", pyfiles.mkString(","))
-
-
- val master: String = if (env.getMaster.isEmpty) {
- "yarn"
- } else {
- env.getMaster
- }
-
- config.mode match {
-
- case "mesos" =>
- conf.set("spark.executor.uri", s"http://$getNode:${config.webserver.Port}/spark-${config.webserver.sparkVersion}.tgz")
- .setJars(jars)
- .set("spark.master", env.getMaster)
- .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-${config.webserver.sparkVersion}")
-
- case "yarn" =>
- conf.set("spark.home", StringUtils.stripStart(config.spark.home,"/"))
- // TODO: parameterize those
- .setJars(Seq(s"executor-${config.version}-all.jar", s"spark-runner-${config.version}-all.jar", s"spark-runtime-${config.version}.jar") ++ jars)
- .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
- .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
- .set("spark.yarn.queue", "default")
- .set("spark.history.kerberos.principal", "none")
-
- .set("spark.master", master)
- .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
- .set("spark.yarn.jars", s"${StringUtils.stripStart(config.spark.home,"/")}/jars/*")
- .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
- .set("spark.dynamicAllocation.enabled", "false")
- .set("spark.eventLog.enabled", "false")
- .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
- .set("hadoop.home.dir", config.yarn.hadoopHomeDir)
-
- case _ => throw new Exception(s"mode ${config.mode} is not legal.")
- }
-
- if (config.spark.opts != null && config.spark.opts.nonEmpty) {
- config.spark.opts.foreach(kv => {
- log.info(s"Setting ${kv._1} to ${kv._2} as specified in amaterasu.properties")
- conf.set(kv._1, kv._2)
- })
- }
-
- // adding the the configurations from spark.yml
- sparkConf match {
- case Some(cnf) => {
- for (c <- cnf) {
- if (c._2.isInstanceOf[String])
- conf.set(c._1, c._2.toString)
- }
- }
- case None =>
- }
-
- // setting the executor env from spark_exec.yml
- if (executorEnv != null) {
- executorEnv match {
- case Some(env) => {
- for (c <- env) {
- if (c._2.isInstanceOf[String])
- conf.setExecutorEnv(c._1, c._2.toString)
- }
- }
- case None =>
- }
- }
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
- sparkSession = SparkSession.builder
- .appName(sparkAppName)
- .master(env.getMaster)
-
- //.enableHiveSupport()
- .config(conf).getOrCreate()
-
- sparkSession.conf.getAll.foreach(x => log.info(x.toString))
-
- val hc = sparkSession.sparkContext.hadoopConfiguration
-
- sys.env.get("AWS_ACCESS_KEY_ID") match {
- case None =>
- case _ =>
- hc.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
- hc.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))
- hc.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
- }
-
- sparkSession
- }
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
deleted file mode 100755
index 7e089b5..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.sql.{Dataset, SparkSession}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-import scala.tools.nsc.interpreter.{IMain, Results}
-
-class ResHolder(var value: Any)
-
-class SparkScalaRunner(var env: Environment,
- var jobId: String,
- var interpreter: IMain,
- var outStream: ByteArrayOutputStream,
- var spark: SparkSession,
- var notifier: Notifier,
- var jars: Seq[String]) extends Logging with AmaterasuRunner {
-
- private def scalaOptionError(msg: String): Unit = {
- notifier.error("", msg)
- }
-
- override def getIdentifier = "scala"
-
- val holder = new ResHolder(null)
-
- override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
- val source = Source.fromString(actionSource)
- interpretSources(source, actionName, exports.asScala.toMap)
- }
-
- def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = {
-
- notifier.info(s"================= Started action $actionName =================")
- //notifier.info(s"exports is: $exports")
-
- for (line <- source.getLines()) {
-
- // ignoring empty or commented lines
- if (!line.isEmpty && !line.trim.startsWith("*") && !line.startsWith("/")) {
-
- outStream.reset()
- log.debug(line)
-
- if (line.startsWith("import")) {
- interpreter.interpret(line)
- }
- else {
- val intresult = interpreter.interpret(line)
-
- val result = interpreter.prevRequestList.last.lineRep.call("$result")
-
- // intresult: Success, Error, etc
- // result: the actual result (RDD, df, etc.) for caching
- // outStream.toString gives you the error message
- intresult match {
- case Results.Success =>
- log.debug("Results.Success")
-
- notifier.success(line)
-
- val resultName = interpreter.prevRequestList.last.termNames.last
-
- if (exports.contains(resultName.toString)) {
-
- val format = exports(resultName.toString)
-
- if (result != null) {
-
- result match {
- case ds: Dataset[_] =>
- log.debug(s"persisting DataFrame: $resultName")
- val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.getWorkingDir}/$jobId/$actionName/$resultName")"""
- val writeResult = interpreter.interpret(writeLine)
- if (writeResult != Results.Success) {
- val err = outStream.toString
- notifier.error(writeLine, err)
- log.error(s"error persisting dataset: $writeLine Failed with: $err")
- //throw new Exception(err)
- }
- log.debug(s"persisted DataFrame: $resultName")
-
- case _ => notifier.info(s"""+++> result type ${result.getClass}""")
- }
- }
- }
-
- case Results.Error =>
- log.debug("Results.Error")
- val err = outStream.toString
- notifier.error(line, err)
- throw new Exception(err)
-
- case Results.Incomplete =>
- log.debug("Results.Incomplete")
-
- }
- }
- }
- }
-
- notifier.info(s"================= finished action $actionName =================")
- }
-
- def initializeAmaContext(env: Environment): Unit = {
-
- // setting up some context :)
- val sc = this.spark.sparkContext
- val sqlContext = this.spark.sqlContext
-
- interpreter.interpret("import scala.util.control.Exception._")
- interpreter.interpret("import org.apache.spark.{ SparkContext, SparkConf }")
- interpreter.interpret("import org.apache.spark.sql.SQLContext")
- interpreter.interpret("import org.apache.spark.sql.{ Dataset, SparkSession }")
- interpreter.interpret("import org.apache.spark.sql.SaveMode")
- interpreter.interpret("import org.apache.amaterasu.frameworks.spark.runtime.AmaContext")
- interpreter.interpret("import org.apache.amaterasu.common.runtime.Environment")
-
- // creating a map (_contextStore) to hold the different spark contexts
- // in th REPL and getting a reference to it
- interpreter.interpret("var _contextStore = scala.collection.mutable.Map[String, AnyRef]()")
- val contextStore = interpreter.prevRequestList.last.lineRep.call("$result").asInstanceOf[mutable.Map[String, AnyRef]]
- AmaContext.init(spark, jobId, env)
-
- // populating the contextStore
- contextStore.put("sc", sc)
- contextStore.put("sqlContext", sqlContext)
- contextStore.put("env", env)
- contextStore.put("spark", spark)
- contextStore.put("ac", AmaContext)
-
- interpreter.interpret("val sc = _contextStore(\"sc\").asInstanceOf[SparkContext]")
- interpreter.interpret("val sqlContext = _contextStore(\"sqlContext\").asInstanceOf[SQLContext]")
- interpreter.interpret("val env = _contextStore(\"env\").asInstanceOf[Environment]")
- interpreter.interpret("val spark = _contextStore(\"spark\").asInstanceOf[SparkSession]")
- interpreter.interpret("val AmaContext = _contextStore(\"ac\").asInstanceOf[AmaContext]")
- interpreter.interpret("import sqlContext.implicits._")
-
- // initializing the AmaContext
- println(s"""AmaContext.init(sc, sqlContext ,"$jobId")""")
-
- }
-
-}
-
-object SparkScalaRunner extends Logging {
-
- def apply(env: Environment,
- jobId: String,
- spark: SparkSession,
- outStream: ByteArrayOutputStream,
- notifier: Notifier,
- jars: Seq[String]): SparkScalaRunner = {
-
- new SparkScalaRunner(env, jobId, SparkRunnerHelper.getOrCreateScalaInterperter(outStream, jars), outStream, spark, notifier, jars)
-
- }
-
-}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparkr/SparkRRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparkr/SparkRRunner.scala
deleted file mode 100644
index 2dc7b4e..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparkr/SparkRRunner.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.sparkr
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkContext
-
-
-class SparkRRunner extends Logging with AmaterasuRunner {
-
- override def getIdentifier = "spark-r"
-
- override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
- }
-}
-
-object SparkRRunner {
- def apply(
- env: Environment,
- jobId: String,
- sparkContext: SparkContext,
- outStream: ByteArrayOutputStream,
- notifier: Notifier,
- jars: Seq[String]
- ): SparkRRunner = {
- new SparkRRunner()
- }
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
deleted file mode 100644
index 9230bca..0000000
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.sparksql
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.commons.io.FilenameUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-import scala.collection.JavaConverters._
-
-/**
- * Amaterasu currently supports JSON and PARQUET as data sources.
- * CSV data source support will be provided in later versions.
- */
-class SparkSqlRunner extends Logging with AmaterasuRunner {
- var env: Environment = _
- var notifier: Notifier = _
- var jobId: String = _
- //var actionName: String = _
- var spark: SparkSession = _
-
- /*
- Method: executeQuery
- Description: when user specifies query in amaterasu format, this method parse and executes the query.
- If not in Amaterasu format, then directly executes the query
- @Params: query string
- */
- override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
- notifier.info(s"================= Started action $actionName =================")
-
- if (!actionSource.isEmpty) {
-
- var result: DataFrame = null
- if (actionSource.toLowerCase.contains("amacontext")) {
-
- //Parse the incoming query
- //notifier.info(s"================= parsing the SQL query =================")
-
- val parser: List[String] = actionSource.toLowerCase.split(" ").toList
- var sqlPart1: String = ""
- var sqlPart2: String = ""
- var queryTempLen: Int = 0
-
- //get only the sql part of the query
- for (i <- 0 to parser.indexOf("from")) {
- sqlPart1 += parser(i) + " "
- }
-
- if (parser.indexOf("readas") == -1) {
- queryTempLen = parser.length - 1
- }
- else
- queryTempLen = parser.length - 3
-
- for (i <- parser.indexOf("from") + 1 to queryTempLen) {
- if (!parser(i).contains("amacontext"))
- sqlPart2 += " " + parser(i)
- }
-
- //If no read format is speicified by the user, use PARQUET as default file format to load data
- var fileFormat: String = null
- //if there is no index for "readas" keyword, then set PARQUET as default read format
- if (parser.indexOf("readas") == -1) {
- fileFormat = "parquet"
- }
- else
- fileFormat = parser(parser.indexOf("readas") + 1)
-
-
- val locationPath: String = parser.filter(word => word.contains("amacontext")).mkString("")
- val directories = locationPath.split("_")
- val actionName = directories(1)
- val dfName = directories(2)
- val parsedQuery = sqlPart1 + locationPath + sqlPart2
-
- //Load the dataframe from previous action
- val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat)
- loadData.createOrReplaceTempView(locationPath)
-
-
- try{
-
- result = spark.sql(parsedQuery)
- notifier.success(parsedQuery)
- } catch {
- case e: Exception => notifier.error(parsedQuery, e.getMessage)
- }
-
- }
- else {
-
- notifier.info("Executing SparkSql on: " + actionSource)
-
- result = spark.sql(actionSource)
- }
- val exportsBuff = exports.asScala.toBuffer
- if (exportsBuff.nonEmpty) {
- val exportName = exportsBuff.head._1
- val exportFormat = exportsBuff.head._2
- //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
- result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.getWorkingDir}/$jobId/$actionName/$exportName")
- }
- notifier.info(s"================= finished action $actionName =================")
- }
- }
-
- /*
- Method to find the file type of files within a directory
- @Params
- folderName : Path to location of the directory containing data-source files
- */
-
- def findFileType(folderName: File): Array[String] = {
- // get all the files from a directory
- val files: Array[File] = folderName.listFiles()
- val extensions: Array[String] = files.map(file => FilenameUtils.getExtension(file.toString))
- extensions
- }
-
- override def getIdentifier: String = "sql"
-
-}
-
-object SparkSqlRunner {
-
- def apply(env: Environment,
- jobId: String,
- // actionName: String,
- notifier: Notifier,
- spark: SparkSession): SparkSqlRunner = {
-
- val sparkSqlRunnerObj = new SparkSqlRunner
-
- sparkSqlRunnerObj.env = env
- sparkSqlRunnerObj.jobId = jobId
- //sparkSqlRunnerObj.actionName = actionName
- sparkSqlRunnerObj.notifier = notifier
- sparkSqlRunnerObj.spark = spark
- sparkSqlRunnerObj
- }
-}
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
deleted file mode 100644
index 5f0ce0e..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
+++ /dev/null
@@ -1,4 +0,0 @@
-name,age
-sampath,22
-kirupa,30
-dev,19
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json b/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
deleted file mode 100644
index d297f1f..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{"name":"Sampath","age":22}
-{"name":"Kirupa", "age":30}
-{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644
index 7d66b64..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644
index 74b1890..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
deleted file mode 100644
index e69de29..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
+++ /dev/null
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
deleted file mode 100644
index 5d83fd6..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
deleted file mode 100644
index ac54053..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index e1b0d2e..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index d807ba9..0000000
--- a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/amaterasu.properties b/frameworks/spark/runner/src/test/resources/amaterasu.properties
deleted file mode 100755
index e95df02..0000000
--- a/frameworks/spark/runner/src/test/resources/amaterasu.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-zk=127.0.0.1
-version=0.2.0-incubating
-master=192.168.33.11
-user=root
-mode=mesos
-webserver.port=8000
-webserver.root=dist
-spark.version=2.1.1-bin-hadoop2.7
-pysparkPath = /usr/bin/python
diff --git a/frameworks/spark/runner/src/test/resources/codegen.py b/frameworks/spark/runner/src/test/resources/codegen.py
deleted file mode 100644
index 113d9be..0000000
--- a/frameworks/spark/runner/src/test/resources/codegen.py
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
- codegen
- ~~~~~~~
-
- Extension to ast that allow ast -> python code generation.
-
- :copyright: Copyright 2008 by Armin Ronacher.
- :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
- """This function can convert a node tree back into python sourcecode.
- This is useful for debugging purposes, especially if you're dealing with
- custom asts not generated by python itself.
-
- It could be that the sourcecode is evaluable when the AST itself is not
- compilable / evaluable. The reason for this is that the AST contains some
- more data than regular sourcecode does, which is dropped during
- conversion.
-
- Each level of indentation is replaced with `indent_with`. Per default this
- parameter is equal to four spaces as suggested by PEP 8, but it might be
- adjusted to match the application's styleguide.
-
- If `add_line_information` is set to `True` comments for the line numbers
- of the nodes are added to the output. This can be used to spot wrong line
- number information of statement nodes.
- """
- generator = SourceGenerator(indent_with, add_line_information)
- generator.visit(node)
-
- return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
- """This visitor is able to transform a well formed syntax tree into python
- sourcecode. For more details have a look at the docstring of the
- `node_to_source` function.
- """
-
- def __init__(self, indent_with, add_line_information=False):
- self.result = []
- self.indent_with = indent_with
- self.add_line_information = add_line_information
- self.indentation = 0
- self.new_lines = 0
-
- def write(self, x):
- if self.new_lines:
- if self.result:
- self.result.append('\n' * self.new_lines)
- self.result.append(self.indent_with * self.indentation)
- self.new_lines = 0
- self.result.append(x)
-
- def newline(self, node=None, extra=0):
- self.new_lines = max(self.new_lines, 1 + extra)
- if node is not None and self.add_line_information:
- self.write('# line: %s' % node.lineno)
- self.new_lines = 1
-
- def body(self, statements):
- self.new_line = True
- self.indentation += 1
- for stmt in statements:
- self.visit(stmt)
- self.indentation -= 1
-
- def body_or_else(self, node):
- self.body(node.body)
- if node.orelse:
- self.newline()
- self.write('else:')
- self.body(node.orelse)
-
- def signature(self, node):
- want_comma = []
- def write_comma():
- if want_comma:
- self.write(', ')
- else:
- want_comma.append(True)
-
- padding = [None] * (len(node.args) - len(node.defaults))
- for arg, default in zip(node.args, padding + node.defaults):
- write_comma()
- self.visit(arg)
- if default is not None:
- self.write('=')
- self.visit(default)
- if node.vararg is not None:
- write_comma()
- self.write('*' + node.vararg)
- if node.kwarg is not None:
- write_comma()
- self.write('**' + node.kwarg)
-
- def decorators(self, node):
- for decorator in node.decorator_list:
- self.newline(decorator)
- self.write('@')
- self.visit(decorator)
-
- # Statements
-
- def visit_Assert(self, node):
- self.newline(node)
- self.write('assert ')
- self.visit(node.test)
- if node.msg is not None:
- self.write(', ')
- self.visit(node.msg)
-
- def visit_Assign(self, node):
- self.newline(node)
- for idx, target in enumerate(node.targets):
- if idx:
- self.write(', ')
- self.visit(target)
- self.write(' = ')
- self.visit(node.value)
-
- def visit_AugAssign(self, node):
- self.newline(node)
- self.visit(node.target)
- self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
- self.visit(node.value)
-
- def visit_ImportFrom(self, node):
- self.newline(node)
- self.write('from %s%s import ' % ('.' * node.level, node.module))
- for idx, item in enumerate(node.names):
- if idx:
- self.write(', ')
- self.write(item)
-
- def visit_Import(self, node):
- self.newline(node)
- for item in node.names:
- self.write('import ')
- self.visit(item)
-
- def visit_Expr(self, node):
- self.newline(node)
- self.generic_visit(node)
-
- def visit_FunctionDef(self, node):
- self.newline(extra=1)
- self.decorators(node)
- self.newline(node)
- self.write('def %s(' % node.name)
- self.visit(node.args)
- self.write('):')
- self.body(node.body)
-
- def visit_ClassDef(self, node):
- have_args = []
- def paren_or_comma():
- if have_args:
- self.write(', ')
- else:
- have_args.append(True)
- self.write('(')
-
- self.newline(extra=2)
- self.decorators(node)
- self.newline(node)
- self.write('class %s' % node.name)
- for base in node.bases:
- paren_or_comma()
- self.visit(base)
- # XXX: the if here is used to keep this module compatible
- # with python 2.6.
- if hasattr(node, 'keywords'):
- for keyword in node.keywords:
- paren_or_comma()
- self.write(keyword.arg + '=')
- self.visit(keyword.value)
- if node.starargs is not None:
- paren_or_comma()
- self.write('*')
- self.visit(node.starargs)
- if node.kwargs is not None:
- paren_or_comma()
- self.write('**')
- self.visit(node.kwargs)
- self.write(have_args and '):' or ':')
- self.body(node.body)
-
- def visit_If(self, node):
- self.newline(node)
- self.write('if ')
- self.visit(node.test)
- self.write(':')
- self.body(node.body)
- while True:
- else_ = node.orelse
- if len(else_) == 0:
- break
- elif len(else_) == 1 and isinstance(else_[0], If):
- node = else_[0]
- self.newline()
- self.write('elif ')
- self.visit(node.test)
- self.write(':')
- self.body(node.body)
- else:
- self.newline()
- self.write('else:')
- self.body(else_)
- break
-
- def visit_For(self, node):
- self.newline(node)
- self.write('for ')
- self.visit(node.target)
- self.write(' in ')
- self.visit(node.iter)
- self.write(':')
- self.body_or_else(node)
-
- def visit_While(self, node):
- self.newline(node)
- self.write('while ')
- self.visit(node.test)
- self.write(':')
- self.body_or_else(node)
-
- def visit_With(self, node):
- self.newline(node)
- self.write('with ')
- self.visit(node.context_expr)
- if node.optional_vars is not None:
- self.write(' as ')
- self.visit(node.optional_vars)
- self.write(':')
- self.body(node.body)
-
- def visit_Pass(self, node):
- self.newline(node)
- self.write('pass')
-
- def visit_Print(self, node):
- # XXX: python 2.6 only
- self.newline(node)
- self.write('print ')
- want_comma = False
- if node.dest is not None:
- self.write(' >> ')
- self.visit(node.dest)
- want_comma = True
- for value in node.values:
- if want_comma:
- self.write(', ')
- self.visit(value)
- want_comma = True
- if not node.nl:
- self.write(',')
-
- def visit_Delete(self, node):
- self.newline(node)
- self.write('del ')
- for idx, target in enumerate(node):
- if idx:
- self.write(', ')
- self.visit(target)
-
- def visit_TryExcept(self, node):
- self.newline(node)
- self.write('try:')
- self.body(node.body)
- for handler in node.handlers:
- self.visit(handler)
-
- def visit_TryFinally(self, node):
- self.newline(node)
- self.write('try:')
- self.body(node.body)
- self.newline(node)
- self.write('finally:')
- self.body(node.finalbody)
-
- def visit_Global(self, node):
- self.newline(node)
- self.write('global ' + ', '.join(node.names))
-
- def visit_Nonlocal(self, node):
- self.newline(node)
- self.write('nonlocal ' + ', '.join(node.names))
-
- def visit_Return(self, node):
- self.newline(node)
- if node.value is None:
- self.write('return')
- else:
- self.write('return ')
- self.visit(node.value)
-
- def visit_Break(self, node):
- self.newline(node)
- self.write('break')
-
- def visit_Continue(self, node):
- self.newline(node)
- self.write('continue')
-
- def visit_Raise(self, node):
- # XXX: Python 2.6 / 3.0 compatibility
- self.newline(node)
- self.write('raise')
- if hasattr(node, 'exc') and node.exc is not None:
- self.write(' ')
- self.visit(node.exc)
- if node.cause is not None:
- self.write(' from ')
- self.visit(node.cause)
- elif hasattr(node, 'type') and node.type is not None:
- self.visit(node.type)
- if node.inst is not None:
- self.write(', ')
- self.visit(node.inst)
- if node.tback is not None:
- self.write(', ')
- self.visit(node.tback)
-
- # Expressions
-
- def visit_Attribute(self, node):
- self.visit(node.value)
- self.write('.' + node.attr)
-
- def visit_Call(self, node):
- want_comma = []
- def write_comma():
- if want_comma:
- self.write(', ')
- else:
- want_comma.append(True)
-
- self.visit(node.func)
- self.write('(')
- for arg in node.args:
- write_comma()
- self.visit(arg)
- for keyword in node.keywords:
- write_comma()
- self.write(keyword.arg + '=')
- self.visit(keyword.value)
- if node.starargs is not None:
- write_comma()
- self.write('*')
- self.visit(node.starargs)
- if node.kwargs is not None:
- write_comma()
- self.write('**')
- self.visit(node.kwargs)
- self.write(')')
-
- def visit_Name(self, node):
- self.write(node.id)
-
- def visit_Str(self, node):
- self.write(repr(node.s))
-
- def visit_Bytes(self, node):
- self.write(repr(node.s))
-
- def visit_Num(self, node):
- self.write(repr(node.n))
-
- def visit_Tuple(self, node):
- self.write('(')
- idx = -1
- for idx, item in enumerate(node.elts):
- if idx:
- self.write(', ')
- self.visit(item)
- self.write(idx and ')' or ',)')
-
- def sequence_visit(left, right):
- def visit(self, node):
- self.write(left)
- for idx, item in enumerate(node.elts):
- if idx:
- self.write(', ')
- self.visit(item)
- self.write(right)
- return visit
-
- visit_List = sequence_visit('[', ']')
- visit_Set = sequence_visit('{', '}')
- del sequence_visit
-
- def visit_Dict(self, node):
- self.write('{')
- for idx, (key, value) in enumerate(zip(node.keys, node.values)):
- if idx:
- self.write(', ')
- self.visit(key)
- self.write(': ')
- self.visit(value)
- self.write('}')
-
- def visit_BinOp(self, node):
- self.visit(node.left)
- self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
- self.visit(node.right)
-
- def visit_BoolOp(self, node):
- self.write('(')
- for idx, value in enumerate(node.values):
- if idx:
- self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
- self.visit(value)
- self.write(')')
-
- def visit_Compare(self, node):
- self.write('(')
- self.visit(node.left)
- for op, right in zip(node.ops, node.comparators):
- self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
- self.visit(right)
- self.write(')')
-
- def visit_UnaryOp(self, node):
- self.write('(')
- op = UNARYOP_SYMBOLS[type(node.op)]
- self.write(op)
- if op == 'not':
- self.write(' ')
- self.visit(node.operand)
- self.write(')')
-
- def visit_Subscript(self, node):
- self.visit(node.value)
- self.write('[')
- self.visit(node.slice)
- self.write(']')
-
- def visit_Slice(self, node):
- if node.lower is not None:
- self.visit(node.lower)
- self.write(':')
- if node.upper is not None:
- self.visit(node.upper)
- if node.step is not None:
- self.write(':')
- if not (isinstance(node.step, Name) and node.step.id == 'None'):
- self.visit(node.step)
-
- def visit_ExtSlice(self, node):
- for idx, item in node.dims:
- if idx:
- self.write(', ')
- self.visit(item)
-
- def visit_Yield(self, node):
- self.write('yield ')
- self.visit(node.value)
-
- def visit_Lambda(self, node):
- self.write('lambda ')
- self.visit(node.args)
- self.write(': ')
- self.visit(node.body)
-
- def visit_Ellipsis(self, node):
- self.write('Ellipsis')
-
- def generator_visit(left, right):
- def visit(self, node):
- self.write(left)
- self.visit(node.elt)
- for comprehension in node.generators:
- self.visit(comprehension)
- self.write(right)
- return visit
-
- visit_ListComp = generator_visit('[', ']')
- visit_GeneratorExp = generator_visit('(', ')')
- visit_SetComp = generator_visit('{', '}')
- del generator_visit
-
- def visit_DictComp(self, node):
- self.write('{')
- self.visit(node.key)
- self.write(': ')
- self.visit(node.value)
- for comprehension in node.generators:
- self.visit(comprehension)
- self.write('}')
-
- def visit_IfExp(self, node):
- self.visit(node.body)
- self.write(' if ')
- self.visit(node.test)
- self.write(' else ')
- self.visit(node.orelse)
-
- def visit_Starred(self, node):
- self.write('*')
- self.visit(node.value)
-
- def visit_Repr(self, node):
- # XXX: python 2.6 only
- self.write('`')
- self.visit(node.value)
- self.write('`')
-
- # Helper Nodes
-
- def visit_alias(self, node):
- self.write(node.name)
- if node.asname is not None:
- self.write(' as ' + node.asname)
-
- def visit_comprehension(self, node):
- self.write(' for ')
- self.visit(node.target)
- self.write(' in ')
- self.visit(node.iter)
- if node.ifs:
- for if_ in node.ifs:
- self.write(' if ')
- self.visit(if_)
-
- def visit_excepthandler(self, node):
- self.newline(node)
- self.write('except')
- if node.type is not None:
- self.write(' ')
- self.visit(node.type)
- if node.name is not None:
- self.write(' as ')
- self.visit(node.name)
- self.write(':')
- self.body(node.body)
-
- def visit_arguments(self, node):
- self.signature(node)
diff --git a/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip b/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
deleted file mode 100644
index 8c3829e..0000000
--- a/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/py4j.tar.gz b/frameworks/spark/runner/src/test/resources/py4j.tar.gz
deleted file mode 100644
index 761a0af..0000000
--- a/frameworks/spark/runner/src/test/resources/py4j.tar.gz
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py b/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
deleted file mode 100755
index c940eea..0000000
--- a/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
- def __init__(self, sc, spark, job_id, env):
- self.sc = sc
- self.spark = spark
- self.job_id = job_id
- self.env = env
-
- def get_dataframe(self, action_name, dataset_name, format = "parquet"):
- return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
- def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
- self.name = name
- self.master = master
- self.input_root_path = input_root_path
- self.output_root_path = output_root_path
- self.working_dir = working_dir
- self.configuration = configuration
-
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-rdd = ama_context.sc.parallelize(data)
-odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/pyspark.tar.gz b/frameworks/spark/runner/src/test/resources/pyspark.tar.gz
deleted file mode 100644
index 6f25984..0000000
--- a/frameworks/spark/runner/src/test/resources/pyspark.tar.gz
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/pyspark.zip b/frameworks/spark/runner/src/test/resources/pyspark.zip
deleted file mode 100644
index a624c9f..0000000
--- a/frameworks/spark/runner/src/test/resources/pyspark.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/runtime.py b/frameworks/spark/runner/src/test/resources/runtime.py
deleted file mode 100644
index d01664c..0000000
--- a/frameworks/spark/runner/src/test/resources/runtime.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
- def __init__(self, sc, spark, job_id, env):
- self.sc = sc
- self.spark = spark
- self.job_id = job_id
- self.env = env
-
- def get_dataframe(self, action_name, dataset_name, format = "parquet"):
- return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
- def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
- self.name = name
- self.master = master
- self.input_root_path = input_root_path
- self.output_root_path = output_root_path
- self.working_dir = working_dir
- self.configuration = configuration
diff --git a/frameworks/spark/runner/src/test/resources/simple-pyspark.py b/frameworks/spark/runner/src/test/resources/simple-pyspark.py
deleted file mode 100755
index 923f81c..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-pyspark.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-try:
- rdd = sc.parallelize(data)
-
- def g(x):
- print(x)
-
- rdd.foreach(g)
-except Exception as e:
- print type(e), e
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/simple-python-err.py b/frameworks/spark/runner/src/test/resources/simple-python-err.py
deleted file mode 100755
index dff1491..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-python-err.py
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-1/0
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
- the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/frameworks/spark/runner/src/test/resources/simple-python.py b/frameworks/spark/runner/src/test/resources/simple-python.py
deleted file mode 100755
index 0ac6f85..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-python.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
-print(data)
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
- the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/frameworks/spark/runner/src/test/resources/simple-spark.scala b/frameworks/spark/runner/src/test/resources/simple-spark.scala
deleted file mode 100755
index f2e49fd..0000000
--- a/frameworks/spark/runner/src/test/resources/simple-spark.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.spark.sql.{DataFrame, SaveMode}
-
-val data = Seq(1,3,4,5,6)
-
-
-val sc = AmaContext.sc
-val rdd = sc.parallelize(data)
-val sqlContext = AmaContext.spark
-val x: DataFrame = rdd.toDF()
-
-x.write.mode(SaveMode.Overwrite)
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/spark-version-info.properties b/frameworks/spark/runner/src/test/resources/spark-version-info.properties
deleted file mode 100644
index 8410a21..0000000
--- a/frameworks/spark/runner/src/test/resources/spark-version-info.properties
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-version=2.1.0-SNAPSHOT
-
-user=root
-
-revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe
-
-branch=master
-
-date=2016-07-27T11:23:21Z
-
-url=https://github.com/apache/spark.git
diff --git a/frameworks/spark/runner/src/test/resources/spark_intp.py b/frameworks/spark/runner/src/test/resources/spark_intp.py
deleted file mode 100755
index fd8dc0e..0000000
--- a/frameworks/spark/runner/src/test/resources/spark_intp.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-from runtime import AmaContext, Environment
-
-os.chdir(os.getcwd() + '/build/resources/test/')
-import zipfile
-zip = zipfile.ZipFile('pyspark.zip')
-zip.extractall()
-zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-zip.extractall()
-sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-sys.path.append(os.getcwd())
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
- actionData = queue.getNext()
- resultQueue = entry_point.getResultQueue(actionData._2())
- actionSource = actionData._1()
- tree = ast.parse(actionSource)
- exports = actionData._3()
-
- for node in tree.body:
-
- wrapper = ast.Module(body=[node])
- try:
- co = compile(wrapper, "<ast>", 'exec')
- exec (co)
- resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
- #if this node is an assignment, we need to check if it needs to be persisted
- try:
- persistCode = ''
- if(isinstance(node,ast.Assign)):
- varName = node.targets[0].id
- if(exports.containsKey(varName)):
- persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
- persist = compile(persistCode, '<stdin>', 'exec')
- exec(persist)
-
- except:
- resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
- except:
- resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
- resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/step-2.scala b/frameworks/spark/runner/src/test/resources/step-2.scala
deleted file mode 100755
index 86fd048..0000000
--- a/frameworks/spark/runner/src/test/resources/step-2.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
-highNoDf.show
diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index e1b0d2e..0000000
--- a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index d807ba9..0000000
--- a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/RunnersLoadingTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/RunnersLoadingTests.scala
deleted file mode 100644
index dff1e4a..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/RunnersLoadingTests.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.scalatest._
-
-@DoNotDiscover
-class RunnersLoadingTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- var env: Environment = _
- var factory: ProvidersFactory = _
-
- "RunnersFactory" should "be loaded with all the implementations of AmaterasuRunner in its classpath" in {
- val r = factory.getRunner("spark", "scala")
- r should not be null
- }
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
deleted file mode 100644
index d940b2f..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner
-
-import java.io.{ByteArrayOutputStream, File}
-
-import org.apache.amaterasu.common.dataobjects.{Artifact, ExecData, Repo}
-import org.apache.amaterasu.common.execution.dependencies._
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunnerTests
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkScalaRunner, SparkScalaRunnerTests}
-import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunnerTests
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.spark.sql.SparkSession
-import org.scalatest._
-
-import scala.collection.mutable.ListBuffer
-import scala.collection.JavaConverters._
-
-class SparkTestsSuite extends Suites(
- //new PySparkRunnerTests,
- new RunnersLoadingTests,
- new SparkSqlRunnerTests,
- new SparkScalaRunnerTests
-) with BeforeAndAfterAll {
-
- var env: Environment = _
- var factory: ProvidersFactory = _
- var spark: SparkSession = _
-
- private def createTestMiniconda(): Unit = {
- println(s"PATH: ${new File(".").getAbsolutePath}")
- new File("miniconda/pkgs").mkdirs()
- }
-
- override def beforeAll(): Unit = {
-
- // I can't apologise enough for this
- val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
- val workDir = new File(resources).getParentFile.getParent
-
- env = new Environment()
- env.setWorkingDir(s"file://$workDir")
-
- env.setMaster("local[1]")
- if (env.getConfiguration != null) env.setConfiguration(Map("pysparkPath" -> "/usr/bin/python").asJava) else env.setConfiguration( Map(
- "pysparkPath" -> "/usr/bin/python",
- "cwd" -> resources
- ).asJava)
-
- val excEnv = Map[String, Any](
- "PYTHONPATH" -> resources
- )
- createTestMiniconda()
- env.setConfiguration(Map( "spark_exec_env" -> excEnv).asJava)
- factory = ProvidersFactory(new ExecData(env,
- new Dependencies(ListBuffer.empty[Repo].asJava, List.empty[Artifact].asJava),
- new PythonDependencies(Array.empty[String]),
- Map(
- "spark" -> Map.empty[String, Any].asJava,
- "spark_exec_env" -> Map("PYTHONPATH" -> resources).asJava).asJava),
- "test",
- new ByteArrayOutputStream(),
- new TestNotifier(),
- "test",
- "localhost",
- getClass.getClassLoader.getResource("amaterasu.properties").getPath)
- spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark
-
- this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
- this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
- this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
- this.nestedSuites.filter(s => s.isInstanceOf[SparkScalaRunnerTests]).foreach(s => s.asInstanceOf[SparkScalaRunnerTests].factory = factory)
- this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
-
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- new File("miniconda").delete()
- spark.stop()
-
- super.afterAll()
- }
-
-}
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunnerTests.scala
deleted file mode 100755
index 1ed029f..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunnerTests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.pyspark
-
-import java.io.File
-
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-import scala.io.Source
-
-@DoNotDiscover
-class PySparkRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- Logger.getLogger("org").setLevel(Level.OFF)
- Logger.getLogger("akka").setLevel(Level.OFF)
- Logger.getLogger("spark").setLevel(Level.OFF)
- Logger.getLogger("jetty").setLevel(Level.OFF)
- Logger.getRootLogger.setLevel(Level.OFF)
-
- var factory: ProvidersFactory = _
-
- def delete(file: File) {
- if (file.isDirectory)
- Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
- file.delete
- }
-
- override protected def afterAll(): Unit = {
- val pysparkDir = new File(getClass.getResource("/pyspark").getPath)
- val py4jDir = new File(getClass.getResource("/py4j").getPath)
- delete(pysparkDir)
- delete(py4jDir)
- super.afterAll()
- }
-
-
- "PySparkRunner.executeSource" should "execute simple python code" in {
- val src = Source.fromFile(getClass.getResource("/simple-python.py").getPath).mkString
- var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
- println("3333333333333333333333")
- runner.executeSource(src, "test_action1", Map.empty[String, String].asJava)
- }
-
- it should "print and trows an errors" in {
- a[java.lang.Exception] should be thrownBy {
- val src = Source.fromFile(getClass.getResource("/simple-python-err.py").getPath).mkString
- var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
- runner.executeSource(src, "test_action2", Map.empty[String, String].asJava)
- }
- }
-
- it should "also execute spark code written in python" in {
- val src = Source.fromFile(getClass.getResource("/simple-pyspark.py").getPath).mkString
- var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
- runner.executeSource(src, "test_action3", Map("numDS" -> "parquet").asJava)
- }
-
- it should "also execute spark code written in python with AmaContext being used" in {
- val src = Source.fromFile(getClass.getResource("/pyspark-with-amacontext.py").getPath).mkString
- var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
- runner.executeSource(src, "test_action4", Map.empty[String, String].asJava)
- }
-
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
deleted file mode 100755
index 011c4d9..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.repl
-
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.frameworks.spark.runtime.AmaContext
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-import scala.io.Source
-
-@DoNotDiscover
-class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
- var factory: ProvidersFactory = _
- var runner: SparkScalaRunner = _
-
-
- "SparkScalaRunner" should "execute the simple-spark.scala" in {
-
-
- val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
- val script = getClass.getResource("/simple-spark.scala").getPath
- val sourceCode = Source.fromFile(script).getLines().mkString("\n")
- sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
-
- }
-
- "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
-
- val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
- val script = getClass.getResource("/step-2.scala").getPath
- sparkRunner.env.setWorkingDir(s"${getClass.getResource("/tmp").getPath}")
- AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
- val sourceCode = Source.fromFile(script).getLines().mkString("\n")
- sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
-
- }
-
-
-}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
deleted file mode 100644
index 756bed9..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.runner.sparksql
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.log4j.{Level, Logger}
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-@DoNotDiscover
-class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- Logger.getLogger("org").setLevel(Level.OFF)
- Logger.getLogger("akka").setLevel(Level.OFF)
- Logger.getLogger("spark").setLevel(Level.OFF)
- Logger.getLogger("jetty").setLevel(Level.OFF)
- Logger.getRootLogger.setLevel(Level.OFF)
-
-
- val notifier = new TestNotifier()
-
- var factory: ProvidersFactory = _
- var env: Environment = _
-
- /*
- Test whether parquet is used as default file format to load data from previous actions
- */
-
- "SparkSql" should "load data as parquet if no input foramt is specified" in {
-
- val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
- val spark: SparkSession = sparkSql.spark
-
- //Prepare test dataset
- val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-
- inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
- sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
-
- val outputDf = spark.read.parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_parquet_test/result")
- println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
- outputDf.first().getString(1) shouldEqual "Michael"
- }
-
- /*
- Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL
- */
-
- "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
-
- val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
- val spark: SparkSession = sparkSql.spark
-
- //Prepare test dataset
- val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
- inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
- sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
-
- val outputDf = spark.read.parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
- println("Output Parquet: " + inputDf.count + "," + outputDf.count)
- inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
- }
-
-
- /*
- Test whether the JSON data is successfully parsed, loaded by SparkSQL
- */
-
- "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
-
- val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
- val spark: SparkSession = sparkSql.spark
-
- //Prepare test dataset
- val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
-
- inputDf.write.mode(SaveMode.Overwrite).json(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
- sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
-
- val outputDf = spark.read.json(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_json_test/result")
- println("Output JSON: " + inputDf.count + "," + outputDf.count)
- outputDf.first().getString(1) shouldEqual "Kirupa"
-
- }
-
- /*
- Test whether the CSV data is successfully parsed, loaded by SparkSQL
- */
-
- "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
-
- val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
- val spark: SparkSession = sparkSql.spark
-
- //Prepare test dataset
- val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
- inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
- sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
-
-
- val outputDf = spark.read.csv(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_csv_test/result")
- println("Output CSV: " + inputDf.count + "," + outputDf.count)
- inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
- }
-
- /*
- Test whether the data can be directly read from a file and executed by sparkSql
- */
-// "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
-//
-// val tempFileEnv = Environment()
-// tempFileEnv.workingDir = "file:/tmp/"
-// AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
-//
-// val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
-// sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
-// val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-// println("Output Parquet dataframe: " + outputParquetDf.show)
-// outputParquetDf.first().getString(1) shouldEqual "Michael"
-// sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
-//
-// val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-// println("Output Json dataframe: " + outputJsonDf.show)
-// outputJsonDf.first().getString(1) shouldEqual "Sampath"
-//
-// }
-
-
-}
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
deleted file mode 100644
index 6a9e9f0..0000000
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-
-
-class TestNotifier extends Notifier {
-
- override def info(msg: String): Unit = {
- getLog.info(msg)
- }
-
- override def success(line: String): Unit = {
- getLog.info(s"successfully executed line: $line")
- }
-
- override def error(line: String, msg: String): Unit = {
- getLog.error(s"Error executing line: $line message: $msg")
- }
-}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index 35f3800..e89345b 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -395,7 +395,7 @@
private fun requestContainer(actionData: ActionData, capability: Resource) {
actionsBuffer.add(actionData)
- log.info("About to ask container for action ${actionData.id} with mem ${capability.memory} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}")
+ log.info("About to ask container for action ${actionData.id} with mem ${capability.memorySize} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}")
// we have an action to schedule, let's request a container
val priority: Priority = Records.newRecord(Priority::class.java)
diff --git a/settings.gradle b/settings.gradle
index e41445d..9e757dc 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -40,8 +40,6 @@
// Frameworks
// Spark
-include 'spark-runner'
-project(':spark-runner').projectDir=file("frameworks/spark/runner")
include 'spark-runtime'
project(':spark-runtime').projectDir=file("frameworks/spark/runtime")
include 'spark-dispatcher'