Merge branch 'master' into version-0.2.0-incubating-rc3
diff --git a/.gitignore b/.gitignore
index a64ebd0..fabb847 100755
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
### Scala ###
*.class
*.log
diff --git a/.travis.yml b/.travis.yml
index 0613a29..fa3b8d3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
language: scala
cache:
directories:
@@ -7,12 +25,13 @@
before_install:
- chmod +x gradlew
script:
-- "./gradlew buildDistribution"
+- "./gradlew clean test buildDistribution"
deploy:
provider: releases
api_key:
secure: LgUGiVEjuFmAepc1XDIEPovYTh6zqn+/q/HXW0AZfwv4RoxROuoXFS8AgG6y1PY3QnbfoxCa2FsvVBwP3MupJyWlMKzZBWp9I1bFEbf4Zmn7Lb5hBJieWrgwvMN/R31YybiA9wBlhL32uN1JN3dvNQIIqwGjRzBS4Hm6db+RtnKiZFTn8p5eB5loyEu32QIlx9PWTLPSVZ/ZEqJlxFE7B5XBRCiy4+JQavFZ9fgg2DmuH5erqBGpv5W/Bn0ekKnj9NA4QaoksNnQSFx+MtL+mvJN/jACZpHF2ACjtMr01KoAUIKC2G8cnX2HVp/CBw2qG5UtTNnlh/ECpG/dtZfKaRmbZhoIxy9clf/PgiBQFdGHJKLrdN2Jpc8wG4aLDM2rRQ0k7oVp0AMWXAB1oZJ7MQcZeiwyAOo/HcU298iGuETRWMghlwEHy66iDzB+3xvxzROC2mVx0cvZE7095PkxJeddKBSpYXRQcdONkNj+b46IenpEj611UqvDilmCikhpIxqFxop24/eeTFOpmREf6JRo1dhMg0e8LwuxU5d4ZiwWx2xwlrZs2wKxrHygx/R8UD08FyntK4vaY6mTfEhcFkPuGb4kUnyrLv4fFq7oH96QXzhTtfftutxMTyLL+Y2+aJTbTeQPo6wpJo3Qn5vEsodzU9ytNV1+yvuqFfPgHtw=
- file: 'build/distributions/apache-amaterasu-0.2.0-incubating.tar'
+ file_glob: true
+ file: 'build/distributions/apache-amaterasu-*.tar'
skip_cleanup: true
on:
tags: true
diff --git a/LICENSE b/LICENSE
index 13f2054..4248414 100644
--- a/LICENSE
+++ b/LICENSE
@@ -200,26 +200,28 @@
See the License for the specific language governing permissions and
limitations under the License.
-For persistence-elasticsearch/plugins/security/src/main/java/org/apache/unomi/elasticsearch/plugin/security/IPRangeMatcher.java :
+============================================================================
+ APACHE AMATERASU SUBCOMPONENTS:
-The MIT License
+ The Apache Amaterasu project contains subcomponents with separate copyright
+ notices and license terms. Your use of the source code for the these
+ subcomponents is subject to the terms and conditions of the following
+ licenses.
-Copyright (c) 2013 Edin Dazdarevic (edin.dazdarevic@gmail.com)
+========================================================================
+BSD licenses
+========================================================================
+The following components are provided under the BSD license.
+See file headers and project links for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
+ (BSD License) codegen (https://github.com/andreif/codegen)
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
+========================================================================
+BSD 3-Clause licenses
+========================================================================
+The following components are provided under the BSD 2-Clause license.
+See file headers and project links for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
\ No newline at end of file
+ (BSD 3 License) Conda (https://conda.io/docs/)
\ No newline at end of file
diff --git a/ama-key b/ama-key
deleted file mode 100644
index 645e687..0000000
--- a/ama-key
+++ /dev/null
@@ -1,51 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIJKQIBAAKCAgEAr2LrzGISOdddwPvua6ae7TssYVu+MBlMwv7d1fG6pusVbq6P
-Ah6HTTbObsoZ4NH3x6gvFxSC2nFp1um9Z0QlyHODUxiUO5v+p6wtuxFySORm5N03
-sjh8xJPRK3n/LhSglyD98Iy0cbGdP51r7+kZGyHk0YfUSjKicGkQAbf1V2wXBzKr
-qynjSHTppQeH5yAiym/IJuanbrpTW0H2JqA4uyV7FkxSiFYApMuHOZ3NUsPk+Evd
-2tkfhqZaw9scaG22icdzypEyDF4See+aMUfb+2WWa4G9ikVbislzTaA7di3MYeNz
-R9lI7QmA2BtqBm4oV7dninLbIrFL5Zp+70m011WdWjQ39D0bRhAfB+R5igxqzeuT
-VOeXMuTWJKo8f1jyRshr/urAB86Di5zzZK2dY9bqUUMLGcY/QU4GgB5FORVawN+K
-y6AhI+AnG/KmmuC0o0cTTkKMa2BUlY7aq7QCT+hqezM+/sOPDYm/WrtyW4Bfc/VR
-vvfc3b9TLk9aUT/2DgZHg/2/j3Odjo7ZXxAqnelxIjgOLMU4RcoI7fVu0STcbnpf
-AAD9DvwScuH0Y0q03BfTtG7BwR6Yiw5mIHoZuNq8c2NebGSYIDN1ZJGq7B5q/26z
-Ml6Rz8SfUGd0qISHntdkpvLBHxohXqC+G8b15GLWX1n2JBsrhFnM86nd6g8CAwEA
-AQKCAgEAmlzVLmiuo+vyn1Tc7jCTNjbbg8DsbocF8aXB93gvEJRdo7HNOk9GRGZV
-YFtOVXpXu4lCEO1DkiE5xyaoRghLvNY2Il/Cr4hHpKm9AiWD0bX8/bfaOmjPH3D+
-K2bPem47PWiTODGO63Yo8YGLK3ecWi4Fp4kGBlv0bj16EhknvU7sIbCuORK/8Ni9
-fztWmMzG8idaISrm+GTT0sEGdc6Uv9poMCLyjP4syN49YS+LNCooD4UueVyaC7fE
-sRbbNOpDO5apSgNq6kmtt1Zz/qXBbs1li69/8//BZzCQ5CR/0S7T8N/waa9LKR0x
-IoNWWNyBc1p/rfIS/sDPDQFicRcNKvgyVeZaEMMezYVgPZKWmLO125DUv0+PEbpp
-/yK88VQLRObTa91GKTTAqTc1aJJHqkp7Kq3jgp6d6xVa9+bJ7lg/VWhaIxY0tubX
-clcp6XmZXwsewBB3TiOdWpx4qpxnuhgvIcrUaJzJ2dREPPNsuYN/G6mGXhDEYe0T
-fw7d274bExeG/TIjre/+75s+ecoWfWvrNnPaYYPDrwk7lYAGQkFBslPe53TO/TVM
-gzdbKDRwev48cmJu3HW3+goe/K0t0OCwdWwZltG3bPJwXm/uF745pNI5HbdaK0FV
-v21eX2lJh28SVtFh8AGKw242PWR2jTfNH9Qzc5SgOL6hac9fdhECggEBANZAd65x
-a8O02FJW1jBqykMyRrhHSEboQg7xS2g+WS9WVn/mE4E69X6gX1h66oIzKu8iBHhY
-O4qRku+UbJqBcG8IFbYOuKZ+pQBke7rHrEmWec4YbtwVo0FB5HML7ZA7UlHoaPDA
-djC161Nk7cDTRxe6THlJub515cNO+CTgBbeqFb/J/m48sb4zd433zit5+rZC1dVg
-TOXdT5mQzxOhbJn3YFW3BwJdIPUBQNBvMVByFjE6yXEYQ9KOvCeEhRnGJ0CzqoFm
-zGQIOCWa8gyg2hH1PHBeS8DGr/X3BVoalJlQXr90pddJ0bfO65fx4HaXO5lqCEQ9
-qpcvlGs1AQNwVFkCggEBANGPun3JctChZ7WCoCPR16hibrLNAnMTH4BHHMuLsAEa
-exwgWAMODfaHebAkobTcUAli72TdAIx6XQ9REFDa2lydWzEDjYIKFA6vD7rWLnQq
-60KWFa2zL/rprEa0vk28DXj/WmBTtRH3E1EzGi1LNziXBtfYJJRceCGOrbGqbvwk
-v6C9popRrVaOqHFLxseMKX5jSNEdoZqVoHGQ551luVUHSxu3cVi+sgItcUTfgpqQ
-0SKXTLdYtc5cGjoEgEDxFJD/I9Y0vCgkFPFU/tz8AwBy0adJEsP68pfRpj9yXntt
-Nzkq15JF+7XK2Vt00rKr+We7fiO8Nm1mJEyMsxHBhKcCggEBALhl0dvUiHBu9IOh
-c0VGlABThCRUTXOhsEEWEdWNW8rvHxGDHqRp7yJlusn3OGCI01nvSDOflNdFRVZn
-wzUTVIZrSexgLTI266Iz2X2/HpxTI1BrHPbUtKaUpJ8T1An/1HDke3VB4Dc6S2iC
-BFKiRJy6XdlBx9iRtgdrrwxltuYFQCTKH+4W1M+jkjEg51Pp7wrw6QN1l5l66Wh9
-BoyZsVOuYj5DgYfaSWQ1COib2rCnEEyckQWCYdUVvgCxALFXJy97srMem6k4ncJX
-4h1WT3mHPNZlggNPveAPE48iM2TklDdpmNZ7FUGCmKg0qADJVqVKagT5ohnu/Gls
-vAuOoTkCggEAHmFk2umCgKZ1n4XRa3/3cMzcWYWJDl++WF122jdlC7PoFxrFR6QY
-+B2J0bRt0QeDfujd5dR4SOVQanEJGX+w2m5hkwh90lVdtQdCE4cLcwHp21xgxi7N
-DOYleJapZCGYHmt+kapw/KrCHSp4aAqYddbHQjFulCeXrt29Zp1bu6gkM8xqwXC3
-3W2PE+W1aqZyOYVxQAe4ru10NiKYwWPG64HELL96ajAzJEesPRzeFURbXVVr5MSy
-jrkhgDm40jFhFug2LM62XGdmtdnpnOXYFY+Pv13Dn6/YzZOyM06ETgZ2VA5W9Di7
-Fg7TVgPoq8hsvtIaplmZ1mBRcNuQ9kkEzwKCAQBDrQTgTizuCOozunnHDsX0XBhh
-XOkwwjsrAtMsg0nGt5EI2gAxetQ+y+aMYldPN5s1hDQ7p1f5pfVhuAPy+MM4OtGb
-SNZEAlTI3abk2f27PTak4HBwqTJNGPQh1iSpDvqFut0W92C8kkZbACwqWo9cwxIR
-GqjOTQZkglUhtVQE1xhvjjw5LQj0ORrd3csnyIKioJ82ZQA5svcYHafgCuMtsHh5
-3c7XJabDcKE6pcZ/9lSu+htUguaUKRLmuMNC8BTAb23DXgIKo1R4xuobG1Asc0za
-vzU9a6BzFN8I7hUi8UpSQOmjWh/UtPq1fGnYSzc6pk+6Abzrer1jVPtNeqtM
------END RSA PRIVATE KEY-----
diff --git a/ama-key.pub b/ama-key.pub
deleted file mode 100644
index 2348b44..0000000
--- a/ama-key.pub
+++ /dev/null
@@ -1 +0,0 @@
-ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQCvYuvMYhI5113A++5rpp7tOyxhW74wGUzC/t3V8bqm6xVuro8CHodNNs5uyhng0ffHqC8XFILacWnW6b1nRCXIc4NTGJQ7m/6nrC27EXJI5Gbk3TeyOHzEk9Eref8uFKCXIP3wjLRxsZ0/nWvv6RkbIeTRh9RKMqJwaRABt/VXbBcHMqurKeNIdOmlB4fnICLKb8gm5qduulNbQfYmoDi7JXsWTFKIVgCky4c5nc1Sw+T4S93a2R+GplrD2xxobbaJx3PKkTIMXhJ575oxR9v7ZZZrgb2KRVuKyXNNoDt2Lcxh43NH2UjtCYDYG2oGbihXt2eKctsisUvlmn7vSbTXVZ1aNDf0PRtGEB8H5HmKDGrN65NU55cy5NYkqjx/WPJGyGv+6sAHzoOLnPNkrZ1j1upRQwsZxj9BTgaAHkU5FVrA34rLoCEj4Ccb8qaa4LSjRxNOQoxrYFSVjtqrtAJP6Gp7Mz7+w48Nib9au3JbgF9z9VG+99zdv1MuT1pRP/YOBkeD/b+Pc52OjtlfECqd6XEiOA4sxThFygjt9W7RJNxuel8AAP0O/BJy4fRjSrTcF9O0bsHBHpiLDmYgehm42rxzY15sZJggM3VkkarsHmr/brMyXpHPxJ9QZ3SohIee12Sm8sEfGiFeoL4bxvXkYtZfWfYkGyuEWczzqd3qDw== whisrael@gmail.com
diff --git a/build.gradle b/build.gradle
index 7bcecd4..79b926f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,20 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+plugins {
+ id "org.nosphere.apache.rat" version "0.3.1"
+}
+
apply plugin: 'distribution'
allprojects {
group 'org.apache.amaterasu'
- version '0.2.0-incubating'
+ version '0.2.0-incubating-rc3'
}
project(':leader')
project(':common')
project(':executor')
+task copyLeagalFiles(type: Copy) {
+ from "./DISCLAIMER", "./LICENSE", "./NOTICE"
+ into "${buildDir}/amaterasu"
+}
+
task buildHomeDir() {
- dependsOn subprojects.collect { getTasksByName('shadowJar', true) }
dependsOn subprojects.collect { getTasksByName('copyToHome', true) }
+ dependsOn copyLeagalFiles
}
distributions {
@@ -46,6 +55,11 @@
dependsOn customDistTar
}
-tasks.withType(Test) {
+rat {
+ // List of exclude directives, defaults to ['**/.gradle/**']
+ excludes = ["**/build/**", '**/.gradle/**', '**/gradle/**', '**/.idea/**', '**/.ruby-version/**', '**/repo/**', '**/resources/**', '**/*.iml/**']
+}
+
+tasks.withType(Test) {
maxParallelForks = 1
}
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 7c9f924..3661b48 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -71,8 +71,8 @@
var memoryMB: Int = 1024
def load(props: Properties): Unit = {
- if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int]
- if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").asInstanceOf[Int]
+ if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").toInt
+ if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").toInt
}
}
@@ -83,8 +83,8 @@
var memoryMB: Int = 1024
def load(props: Properties): Unit = {
- if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int]
- if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int]
+ if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").toInt
+ if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").toInt
}
}
@@ -133,9 +133,9 @@
def load(props: Properties): Unit = {
- if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").asInstanceOf[Double]
- if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").asInstanceOf[Long]
- if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").asInstanceOf[Long]
+ if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").toDouble
+ if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
+ if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
Tasks.load(props)
}
@@ -148,9 +148,9 @@
def load(props: Properties): Unit = {
- if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").asInstanceOf[Int]
- if (props.containsKey("jobs.tasks.cpus")) attempts = props.getProperty("jobs.tasks.cpus").asInstanceOf[Int]
- if (props.containsKey("jobs.tasks.mem")) attempts = props.getProperty("jobs.tasks.mem").asInstanceOf[Int]
+ if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").toInt
+ if (props.containsKey("jobs.tasks.cpus")) cpus = props.getProperty("jobs.tasks.cpus").toInt
+ if (props.containsKey("jobs.tasks.mem")) mem = props.getProperty("jobs.tasks.mem").toInt
}
}
@@ -209,7 +209,7 @@
if (props.containsKey("timeout")) timeout = props.getProperty("timeout").asInstanceOf[Double]
if (props.containsKey("mode")) mode = props.getProperty("mode")
if (props.containsKey("workingFolder")) workingFolder = props.getProperty("workingFolder", s"/user/$user")
-
+ if (props.containsKey("pysparkPath")) pysparkPath = props.getProperty("pysparkPath")
// TODO: rethink this
Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
index ab8a28a..5fa2d74 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.common.dataobjects
import com.google.gson.Gson
diff --git a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
index 8c000cc..9813296 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.common.utils
import java.io.File
diff --git a/executor/build.gradle b/executor/build.gradle
index a173c58..21bc2b0 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -112,6 +112,7 @@
}
task copyToHome(type: Copy) {
+ dependsOn shadowJar
from 'build/libs'
into '../build/amaterasu/dist'
from 'build/resources/main'
diff --git a/executor/src/main/resources/runtime.py b/executor/src/main/resources/runtime.py
index 874b174..d01664c 100644
--- a/executor/src/main/resources/runtime.py
+++ b/executor/src/main/resources/runtime.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
class AmaContext(object):
def __init__(self, sc, spark, job_id, env):
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
index f1752a2..f3c9fc0 100755
--- a/executor/src/main/resources/spark_intp.py
+++ b/executor/src/main/resources/spark_intp.py
@@ -1,16 +1,27 @@
#!/usr/bin/python
-
-# import os
-# user_paths = os.environ['PYTHONPATH']
#
-# with open('/Users/roadan/pypath.txt', 'a') as the_file:
-# the_file.write(user_paths)
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
import ast
import codegen
import os
import sys
import zipimport
+sys.path.append(os.getcwd())
from runtime import AmaContext, Environment
# os.chdir(os.getcwd() + '/build/resources/test/')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
index 94b8056..79fe18a 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
@@ -16,19 +16,21 @@
*/
package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-import java.io.{File, PrintWriter, StringWriter}
+import java.io.File
import java.util
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.common.runtime.Environment
import org.apache.amaterasu.sdk.AmaterasuRunner
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
-import scala.sys.process.Process
+import scala.sys.process.{Process, ProcessLogger}
+
+
class PySparkRunner extends AmaterasuRunner with Logging {
@@ -69,6 +71,15 @@
object PySparkRunner {
+ def collectCondaPackages(): String = {
+ val pkgsDirs = new File("./miniconda/pkgs")
+ (pkgsDirs.listFiles.filter {
+ file => file.getName.endsWith(".tar.bz2")
+ }.map {
+ file => s"./miniconda/pkgs/${file.getName}"
+ }.toBuffer ++ "dist/codegen.py").mkString(",")
+ }
+
def apply(env: Environment,
jobId: String,
notifier: Notifier,
@@ -77,14 +88,13 @@
pyDeps: PythonDependencies,
config: ClusterConfig): PySparkRunner = {
- //TODO: can we make this less ugly?
- var pysparkPython = "/usr/bin/python"
+ val shellLoger = ProcessLogger(
+ (o: String) => println(o),
+ (e: String) => println(e)
+ )
- if (pyDeps != null &&
- pyDeps.packages.nonEmpty) {
- loadPythonDependencies(pyDeps, notifier)
- pysparkPython = "miniconda/bin/python"
- }
+ //TODO: can we make this less ugly?
+
val result = new PySparkRunner
@@ -98,87 +108,44 @@
intpPath = s"spark_intp.py"
}
var pysparkPath = ""
- if (env.configuration.contains("pysparkPath")) {
- pysparkPath = env.configuration("pysparkPath")
- } else {
- pysparkPath = s"${config.spark.home}/bin/spark-submit"
+ var condaPkgs = ""
+ if (pyDeps != null)
+ condaPkgs = collectCondaPackages()
+ var sparkCmd: Seq[String] = Seq()
+ config.mode match {
+ case "yarn" =>
+ pysparkPath = s"spark/bin/spark-submit"
+ sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
+ val proc = Process(sparkCmd, None,
+ "PYTHONPATH" -> pypath,
+ "PYTHONHASHSEED" -> 0.toString)
+
+ proc.run(shellLoger)
+ case "mesos" =>
+ pysparkPath = config.pysparkPath
+ if (pysparkPath.endsWith("spark-submit")) {
+ sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
+ }
+ else {
+ sparkCmd = Seq(pysparkPath, intpPath, port.toString)
+ }
+ var pysparkPython = "/usr/bin/python"
+
+ if (pyDeps != null &&
+ pyDeps.packages.nonEmpty) {
+ pysparkPython = "./miniconda/bin/python"
+ }
+ val proc = Process(sparkCmd, None,
+ "PYTHONPATH" -> pypath,
+ "PYSPARK_PYTHON" -> pysparkPython,
+ "PYTHONHASHSEED" -> 0.toString)
+
+ proc.run(shellLoger)
}
- val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
- "PYTHONPATH" -> pypath,
- "PYSPARK_PYTHON" -> pysparkPython,
- "PYTHONHASHSEED" -> 0.toString) #> System.out
-
- proc.run()
-
result.notifier = notifier
result
}
- /**
- * This installs the required python dependencies.
- * We basically need 2 packages to make pyspark work with customer's scripts:
- * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
- * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
- * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
- * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
- *
- * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
- * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
- * @param notifier
- */
- private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
- notifier.info("loading anaconda evn")
- installAnacondaOnNode()
- val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
- installAnacondaPackage(codegenPackage)
- try {
- deps.packages.foreach(pack => {
- pack.index.getOrElse("anaconda").toLowerCase match {
- case "anaconda" => installAnacondaPackage(pack)
- // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
- }
- })
- }
- catch {
-
- case rte: RuntimeException =>
- val sw = new StringWriter
- rte.printStackTrace(new PrintWriter(sw))
- notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
- case e: Exception =>
- val sw = new StringWriter
- e.printStackTrace(new PrintWriter(sw))
- notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
- }
- }
-
-
- /**
- * Installs one python package using Anaconda.
- * Anaconda works with multiple channels, or better called, repositories.
- * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
- * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
- * @param pythonPackage This comes from parsing the python.yml dep file.
- */
- private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
- val channel = pythonPackage.channel.getOrElse("anaconda")
- if (channel == "anaconda") {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
- } else {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
- }
- }
-
- /**
- * Installs Anaconda and then links it with the local spark that was installed on the executor.
- */
- private def installAnacondaOnNode(): Unit = {
- Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
- Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
- Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
- }
-
-
}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index ff56d8c..ba7ff03 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -47,6 +47,7 @@
)
private var conf: Option[Map[String, Any]] = _
private var executorEnv: Option[Map[String, Any]] = _
+ private var clusterConfig: ClusterConfig = _
override def init(execData: ExecData,
jobId: String,
@@ -60,7 +61,7 @@
(o: String) => log.info(o),
(e: String) => log.error("", e)
)
-
+ clusterConfig = config
var jars = Seq.empty[String]
if (execData.deps != null) {
@@ -83,9 +84,15 @@
sparkScalaRunner.initializeAmaContext(execData.env)
runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-
+ var pypath = ""
// TODO: get rid of hard-coded version
- lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config)
+ config.mode match {
+ case "yarn" =>
+ pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
+ case "mesos" =>
+ pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
+ }
+ lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
runners.put(pySparkRunner.getIdentifier, pySparkRunner)
lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
@@ -95,17 +102,22 @@
private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
val channel = pythonPackage.channel.getOrElse("anaconda")
if (channel == "anaconda") {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+ Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
} else {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+ Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
}
}
private def installAnacondaOnNode(): Unit = {
// TODO: get rid of hard-coded version
- Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
- Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
- Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
+
+ this.clusterConfig.mode match {
+ case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
+ case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
+ }
+
+ Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
+ Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
}
private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index d437778..f4f553c 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.executor.yarn.executors
import java.io.ByteArrayOutputStream
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index abab8a4..f2c2afa 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -145,16 +145,16 @@
case "yarn" =>
conf.set("spark.home", config.spark.home)
// TODO: parameterize those
- .setJars(s"executor-${config.version}-all.jar" +: jars)
+ .setJars(s"executor.jar" +: jars)
.set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
.set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
.set("spark.yarn.queue", "default")
.set("spark.history.kerberos.principal", "none")
.set("spark.master", master)
- .set("spark.executor.instances", "1") // TODO: change this
+ .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
.set("spark.yarn.jars", s"spark/jars/*")
- .set("spark.executor.memory", "1g")
+ .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.eventLog.enabled", "false")
.set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
index 19cb189..d402fed 100755
--- a/executor/src/test/resources/amaterasu.properties
+++ b/executor/src/test/resources/amaterasu.properties
@@ -6,3 +6,4 @@
webserver.port=8000
webserver.root=dist
spark.version=2.1.1-bin-hadoop2.7
+pysparkPath = /usr/bin/python
diff --git a/executor/src/test/resources/pyspark-with-amacontext.py b/executor/src/test/resources/pyspark-with-amacontext.py
index bd780a9..c940eea 100755
--- a/executor/src/test/resources/pyspark-with-amacontext.py
+++ b/executor/src/test/resources/pyspark-with-amacontext.py
@@ -1,3 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class AmaContext(object):
+
+ def __init__(self, sc, spark, job_id, env):
+ self.sc = sc
+ self.spark = spark
+ self.job_id = job_id
+ self.env = env
+
+ def get_dataframe(self, action_name, dataset_name, format = "parquet"):
+ return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
+
+class Environment(object):
+
+ def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
+ self.name = name
+ self.master = master
+ self.input_root_path = input_root_path
+ self.output_root_path = output_root_path
+ self.working_dir = working_dir
+ self.configuration = configuration
+
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = ama_context.sc.parallelize(data)
odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/executor/src/test/resources/runtime.py b/executor/src/test/resources/runtime.py
index 3a90952..d01664c 100644
--- a/executor/src/test/resources/runtime.py
+++ b/executor/src/test/resources/runtime.py
@@ -1,6 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
class AmaContext(object):
- def __init__(self, sc, sqlContext):
+ def __init__(self, sc, spark, job_id, env):
self.sc = sc
- self.sqlContext = sqlContext
+ self.spark = spark
+ self.job_id = job_id
+ self.env = env
+ def get_dataframe(self, action_name, dataset_name, format = "parquet"):
+ return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
+
+class Environment(object):
+
+ def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
+ self.name = name
+ self.master = master
+ self.input_root_path = input_root_path
+ self.output_root_path = output_root_path
+ self.working_dir = working_dir
+ self.configuration = configuration
diff --git a/executor/src/test/resources/simple-pyspark.py b/executor/src/test/resources/simple-pyspark.py
index df9eb0a..923f81c 100755
--- a/executor/src/test/resources/simple-pyspark.py
+++ b/executor/src/test/resources/simple-pyspark.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
try:
rdd = sc.parallelize(data)
diff --git a/executor/src/test/resources/simple-python-err.py b/executor/src/test/resources/simple-python-err.py
index 14ca311..dff1491 100755
--- a/executor/src/test/resources/simple-python-err.py
+++ b/executor/src/test/resources/simple-python-err.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
data = [1, 2, 3, 4, 5]
1/0
diff --git a/executor/src/test/resources/simple-python.py b/executor/src/test/resources/simple-python.py
index 3f25951..0ac6f85 100755
--- a/executor/src/test/resources/simple-python.py
+++ b/executor/src/test/resources/simple-python.py
@@ -1,3 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
data = [1, 2, 3, 4, 5]
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print(data)
diff --git a/executor/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala
index 34798e7..a11a458 100755
--- a/executor/src/test/resources/simple-spark.scala
+++ b/executor/src/test/resources/simple-spark.scala
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
import org.apache.amaterasu.executor.runtime.AmaContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py
index 6b8eaf6..fd8dc0e 100755
--- a/executor/src/test/resources/spark_intp.py
+++ b/executor/src/test/resources/spark_intp.py
@@ -1,24 +1,41 @@
#!/usr/bin/python
-
-# import os
-# user_paths = os.environ['PYTHONPATH']
#
-# with open('/Users/roadan/pypath.txt', 'a') as the_file:
-# the_file.write(user_paths)
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
import ast
import codegen
import os
import sys
-from runtime import AmaContext
+import zipimport
+from runtime import AmaContext, Environment
+
os.chdir(os.getcwd() + '/build/resources/test/')
import zipfile
zip = zipfile.ZipFile('pyspark.zip')
zip.extractall()
zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
zip.extractall()
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
+sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.append(os.getcwd())
+
+# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
+# py4j_importer = zipimport.zipimporter(py4j_path)
+# py4j = py4j_importer.load_module('py4j')
from py4j.java_gateway import JavaGateway, GatewayClient, java_import
from py4j.protocol import Py4JJavaError
from pyspark.conf import SparkConf
@@ -31,9 +48,10 @@
from pyspark.broadcast import Broadcast
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.sql import SparkSession
+from pyspark.sql import Row
client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert = True)
+gateway = JavaGateway(client, auto_convert=True)
entry_point = gateway.entry_point
queue = entry_point.getExecutionQueue()
@@ -49,28 +67,44 @@
jconf = entry_point.getSparkConf()
jsc = entry_point.getJavaSparkContext()
-conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
+job_id = entry_point.getJobId()
+javaEnv = entry_point.getEnv()
+env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
+conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
+conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-
spark = SparkSession(sc, entry_point.getSparkSession())
-sqlc = spark._wrapped
-ama_context = AmaContext(sc, sqlc)
+ama_context = AmaContext(sc, spark, job_id, env)
while True:
actionData = queue.getNext()
resultQueue = entry_point.getResultQueue(actionData._2())
actionSource = actionData._1()
tree = ast.parse(actionSource)
+ exports = actionData._3()
for node in tree.body:
wrapper = ast.Module(body=[node])
try:
- co = compile(wrapper, "<ast>", 'exec')
- exec(co)
+ co = compile(wrapper, "<ast>", 'exec')
+ exec (co)
resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
+
+ #if this node is an assignment, we need to check if it needs to be persisted
+ try:
+ persistCode = ''
+ if(isinstance(node,ast.Assign)):
+ varName = node.targets[0].id
+ if(exports.containsKey(varName)):
+ persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
+ persist = compile(persistCode, '<stdin>', 'exec')
+ exec(persist)
+
+ except:
+ resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
except:
resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/executor/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala
index 189701f..a3d034c 100755
--- a/executor/src/test/resources/step-2.scala
+++ b/executor/src/test/resources/step-2.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
import org.apache.amaterasu.executor.runtime.AmaContext
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
index 68c06ce..9575205 100755
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
+++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
@@ -53,4 +53,4 @@
}
-}
+}
\ No newline at end of file
diff --git a/leader/build.gradle b/leader/build.gradle
index 8595d02..da29397 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -43,10 +43,10 @@
compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.3'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.3'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.3'
- compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.3'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
+ compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908'
@@ -102,6 +102,7 @@
}
task copyToHomeBin(type: Copy) {
+ dependsOn shadowJar
from 'build/libs'
into '../build/amaterasu/bin'
}
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index 731efb8..e3c2812 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -110,8 +111,9 @@
List<String> commands = Collections.singletonList(
- "env AMA_NODE=" + System.getenv("AMA_NODE") + " " +
- "$JAVA_HOME/bin/java" +
+ "env AMA_NODE=" + System.getenv("AMA_NODE") +
+ " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
+ " $JAVA_HOME/bin/java" +
" -Dscala.usejavacp=false" +
" -Xmx1G" +
" org.apache.amaterasu.leader.yarn.ApplicationMaster " +
@@ -128,6 +130,7 @@
// Setup local ama folder on hdfs.
try {
+
if (!fs.exists(jarPathQualified)) {
File home = new File(opts.home);
fs.mkdirs(jarPathQualified);
@@ -139,6 +142,7 @@
// setup frameworks
FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config);
for (String group : frameworkFactory.groups()) {
+ System.out.println("===> setting up " + group);
FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
//creating a group folder
@@ -153,14 +157,15 @@
}
}
} catch (IOException e) {
+ System.out.println("===>" + e.getMessage());
LOGGER.error("Error uploading ama folder to HDFS.", e);
exit(3);
} catch (NullPointerException ne) {
+ System.out.println("===>" + ne.getMessage());
LOGGER.error("No files in home dir.", ne);
exit(4);
}
-
// get version of build
String version = config.version();
@@ -170,7 +175,6 @@
Path mergedPath = Path.mergePaths(jarPath, new Path(leaderJarPath));
// System.out.println("===> path: " + jarPathQualified);
-
LOGGER.info("Leader merged jar path is: {}", mergedPath);
LocalResource leaderJar = null;
LocalResource propFile = null;
@@ -234,7 +238,7 @@
reportBarrier.setBarrier();
reportBarrier.waitOnBarrier();
- String address = new String( client.getData().forPath("/" + newJobId + "/broker"));
+ String address = new String(client.getData().forPath("/" + newJobId + "/broker"));
System.out.println("===> " + address);
setupReportListener(address);
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
index adaeae9..3e1a67b 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.execution.frameworks
import org.apache.amaterasu.common.configuration.ClusterConfig
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
index 0fe378a..8c487c1 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.frameworks.spark
import java.io.File
@@ -15,19 +31,22 @@
private var env: String = _
private var conf: ClusterConfig = _
- private val runnersResources = mutable.Map[String,Array[File]]()
- private var execData: ExecData = _
- private var sparkExecConfigurations = mutable.Map[String, Any]()
+ private val runnersResources = mutable.Map[String, Array[File]]()
+ //private var execData: ExecData = _
+ private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
- override def init(env: String, conf: ClusterConfig): Unit = {
- this.env = env
- this.conf = conf
- this.execData = DataLoader.getExecutorData(env, conf)
+ private def loadSparkConfig: mutable.Map[String, Any] = {
+ val execData = DataLoader.getExecutorData(env, conf)
val sparkExecConfigurationsurations = execData.configurations.get("spark")
if (sparkExecConfigurationsurations.isEmpty) {
throw new Exception(s"Spark configuration files could not be loaded for the environment ${env}")
}
- this.sparkExecConfigurations = sparkExecConfigurations ++ sparkExecConfigurationsurations.get
+ collection.mutable.Map(sparkExecConfigurationsurations.get.toSeq: _*)
+ }
+
+ override def init(env: String, conf: ClusterConfig): Unit = {
+ this.env = env
+ this.conf = conf
runnersResources += "scala" -> Array.empty[File]
runnersResources += "sql" -> Array.empty[File]
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
index 0706491..8aec0f6 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.mesos
import org.apache.amaterasu.common.configuration.ClusterConfig
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 86863f5..87a8f5d 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -16,10 +16,11 @@
*/
package org.apache.amaterasu.leader.mesos.schedulers
+import java.io.File
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import java.util.{Collections, UUID}
+import java.util.{Collections, Properties, UUID}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -50,6 +51,9 @@
*/
class JobScheduler extends AmaterasuScheduler {
+ /*private val props: Properties = new Properties(new File(""))
+ private val version = props.getProperty("version")
+ println(s"===> version $version")*/
LogManager.resetConfiguration()
private var jobManager: JobManager = _
private var client: CuratorFramework = _
@@ -166,15 +170,15 @@
val command = CommandInfo
.newBuilder
.setValue(
- s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-0.2.0-incubating-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
+ s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-${config.version}-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
)
-// HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
-// )
+ // HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
+ // )
.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-0.2.0-incubating-all.jar")
- .setExecutable(false)
- .setExtract(false)
- .build())
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
+ .setExecutable(false)
+ .setExtract(false)
+ .build())
.addUris(URI.newBuilder()
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
.setExecutable(false)
@@ -201,10 +205,10 @@
.setExtract(false)
.build())
.addUris(URI.newBuilder()
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
- .setExecutable(false)
- .setExtract(false)
- .build())
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
+ .setExecutable(false)
+ .setExtract(false)
+ .build())
executor = ExecutorInfo
.newBuilder
.setData(ByteString.copyFrom(execData))
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
index e24d979..2664665 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.utilities
import javax.jms.{Message, MessageListener, TextMessage}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
index 1c86c13..c005256 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
@@ -1,6 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.utilities
-
case class Args(
repo: String = "",
branch: String = "master",
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
index f15c3b1..d1d0c53 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.utilities
import java.io.FileInputStream
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala
index c083bda..7aaa752 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.utilities
object MemoryFormatParser {
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 3fed076..1828100 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -21,8 +21,8 @@
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import javax.jms.Session
+import javax.jms.Session
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.broker.BrokerService
import org.apache.amaterasu.common.configuration.ClusterConfig
@@ -153,21 +153,14 @@
// TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
- rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this)
- rmClient.init(conf)
- rmClient.start()
-
- // Register with ResourceManager
- log.info("Registering application")
- val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
- log.info("Registered application")
+ rmClient = startRMClient()
+ val registrationResponse = registerAppMaster("", 0, "")
val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
log.info("Max mem capability of resources in this cluster " + maxMem)
val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
log.info("Max vcores capability of resources in this cluster " + maxVCores)
log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.registeredActions.size}")
-
// Resource requirements for worker containers
this.capability = Records.newRecord(classOf[Resource])
val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
@@ -194,6 +187,21 @@
log.info("Finished asking for containers")
}
+ private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
+ val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
+ client.init(conf)
+ client.start()
+ client
+ }
+
+ private def registerAppMaster(host: String, port: Int, url: String) = {
+ // Register with ResourceManager
+ log.info("Registering application")
+ val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
+ log.info("Registered application")
+ registrationResponse
+ }
+
private def setupMessaging(jobId: String): Unit = {
val cf = new ActiveMQConnectionFactory(address)
@@ -225,20 +233,6 @@
override def onContainersAllocated(containers: util.List[Container]): Unit = {
- // creating the credentials for container execution
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- val dob = new DataOutputBuffer
- credentials.writeTokenStorageToStream(dob)
-
- // removing the AM->RM token so that containers cannot access it.
- val iter = credentials.getAllTokens.iterator
- log.info("Executing with tokens:")
- for (token <- iter) {
- log.info(token.toString)
- if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
- }
- val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-
log.info(s"${containers.size()} Containers allocated")
for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
if (actionsBuffer.isEmpty) {
@@ -255,8 +249,8 @@
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
val commands: List[String] = List(
"/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
- s"/bin/bash ${config.spark.home}/bin/load-spark-env.sh && ",
- s"java -cp ${config.spark.home}/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
+ s"/bin/bash spark/bin/load-spark-env.sh && ",
+ s"java -cp spark/jars/*:executor.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
"-Xmx1G " +
"-Dscala.usejavacp=true " +
"-Dhdp.version=2.6.1.0-129 " +
@@ -294,7 +288,8 @@
ctx.setEnvironment(Map[String, String](
"HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
"YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
- "AMA_NODE" -> sys.env("AMA_NODE")
+ "AMA_NODE" -> sys.env("AMA_NODE"),
+ "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
))
log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
@@ -316,6 +311,22 @@
}
}
+ private def allTokens: ByteBuffer = {
+ // creating the credentials for container execution
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val dob = new DataOutputBuffer
+ credentials.writeTokenStorageToStream(dob)
+
+ // removing the AM->RM token so that containers cannot access it.
+ val iter = credentials.getAllTokens.iterator
+ log.info("Executing with tokens:")
+ for (token <- iter) {
+ log.info(token.toString)
+ if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+ }
+ ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+ }
+
private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
index 0c1a8f0..70da38e 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
@@ -106,7 +106,7 @@
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
val command = s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")}
| env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz
- | java -cp executor-0.2.0-all.jar:spark-${config.Webserver.sparkVersion}/lib/*
+ | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/*
| -Dscala.usejavacp=true
| -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
| ${jobManager.jobId} ${config.master} ${actionData.name} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh
index 6fd194a..c0f8d52 100755
--- a/leader/src/main/scripts/ama-start-yarn.sh
+++ b/leader/src/main/scripts/ama-start-yarn.sh
@@ -1,4 +1,20 @@
#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
BASEDIR=$(dirname "$0")
@@ -86,7 +102,7 @@
echo "repo: ${REPO} "
echo "force-bin: ${FORCE_BIN}"
export HADOOP_USER_CLASSPATH_FIRST=true
-CMD="yarn jar ${BASEDIR}/bin/leader-0.2.0-incubating-all.jar org.apache.amaterasu.leader.yarn.Client --home ${BASEDIR}"
+CMD="yarn jar ${BASEDIR}/bin/leader-0.2.0-incubating-rc3-all.jar org.apache.amaterasu.leader.yarn.Client --home ${BASEDIR}"
if [ -n "$REPO" ]; then
echo "repo is ${REPO}"
@@ -129,9 +145,9 @@
if [ "$FORCE_BIN" = true ] ; then
echo "FORCE: Deleting and re-creating /apps/amaterasu folder"
eval "hdfs dfs -rm -R -skipTrash /apps/amaterasu"
- eval "hdfs dfs -mkdir /apps/amaterasu/"
- eval "hdfs dfs -chmod -R 777 /apps/amaterasu/"
- eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
+ #eval "hdfs dfs -mkdir /apps/amaterasu/"
+ #eval "hdfs dfs -chmod -R 777 /apps/amaterasu/"
+ #eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
fi
eval $CMD | grep "===>"
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader/src/main/scripts/amaterasu.properties
index 7961db9..5cd6638 100755
--- a/leader/src/main/scripts/amaterasu.properties
+++ b/leader/src/main/scripts/amaterasu.properties
@@ -1,5 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
zk=127.0.0.1
-version=0.2.0-incubating
+version=0.2.0-incubating-rc3
master=192.168.33.11
user=root
mode=yarn
diff --git a/leader/src/main/scripts/log4j.properties b/leader/src/main/scripts/log4j.properties
index c5e965f..742eb59 100644
--- a/leader/src/main/scripts/log4j.properties
+++ b/leader/src/main/scripts/log4j.properties
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# Root logger option
log4j.rootLogger=DEBUG, stdout, file
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
index d1be723..ef53fa9 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.sdk.frameworks;
import org.apache.amaterasu.common.configuration.ClusterConfig;
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java
index ff9d7c7..8fe641c 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.sdk.frameworks.configuration;
public class DriverConfiguration {