blob: 307364db00a77ad7bae734a0aff37569aa854a7a [file] [log] [blame]
# /**
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
import argparse
import json
from os.path import isfile
from os import getcwd
from subprocess import call, check_call
#######################################################################################################################
# I put these here so it will (hopeully) be easy(er) to bump versions / maintain
# If there is demand, we could easily make parts or all comand line arguments as well
#######################################################################################################################
tar_name = "apache-mahout-distribution-0.12.2.tar.gz"
mahout_bin_url = "http://apache.osuosl.org/mahout/0.12.2/%s" % tar_name
mahout_version = "0.12.2"
parser = argparse.ArgumentParser()
parser.add_argument("--force_download", help="force download Apache Mahout", action="store_true")
parser.add_argument("--restart_later", help="force download Apache Mahout", action="store_true")
parser.add_argument("--zeppelin_home", help="path to ZEPPELIN_HOME")
parser.add_argument("--mahout_home", help="path to MAHOUT_HOME, use this if you have already installed Apache Mahout")
parser.add_argument("--overwrite_existing", help="if %sparkMahout or %flinkMahout exist, delete them and create new ones. Otherwise Fail.", action="store_true")
args = parser.parse_args()
class ZeppelinTerpWrangler:
def __init__(self, interpreter_json_path):
self.interpreter_json_path = interpreter_json_path
def _getTerpID(self, terpName):
terp_id = None
for k, v in self.interpreter_json['interpreterSettings'].iteritems():
if v['name'] == terpName:
terp_id = k
break
return terp_id
def _terpExists(self, terpName):
terp_id = self._getTerpID(terpName)
if terp_id == None:
return False
return True
def createTerp(self, original_terp_name, new_terp_name, overwrite_existing=True ):
new_terp_id = new_terp_name
if self._terpExists(new_terp_name):
print "Found existing '%s' interpreter..." % new_terp_name
if overwrite_existing:
print "deleting %s from interpreter.json" %new_terp_name
del self.interpreter_json['interpreterSettings'][self._getTerpID(new_terp_name)]
else:
print "exiting program."
exit(1)
orig_terp_id = self._getTerpID(original_terp_name)
from copy import deepcopy
self.interpreter_json['interpreterSettings'][new_terp_id] = deepcopy(
self.interpreter_json['interpreterSettings'][orig_terp_id])
self.interpreter_json['interpreterSettings'][new_terp_id]['name'] = new_terp_name
self.interpreter_json['interpreterSettings'][new_terp_id]['id'] = new_terp_id
print "created new interpreter '%s' from interpreter '%s" % (new_terp_name, original_terp_name)
def _readTerpJson(self):
with open(self.interpreter_json_path) as f:
self.interpreter_json = json.load(f)
def _writeTerpJson(self):
with open(self.interpreter_json_path, 'wb') as f:
json.dump(self.interpreter_json, f, sort_keys=True, indent=4)
def _updateTerpProp(self, terpName, property, value):
terp_id = self._getTerpID(terpName)
self.interpreter_json['interpreterSettings'][terp_id]['properties'][property] = value
def _addTerpDep(self, terpName="", dep="", exclusions=None):
if self.interpreter_json == {}:
print "no interpreter.json loaded, reading last one downloaded"
self._readTerpJson()
terp_id = self._getTerpID(terpName)
deps = self.interpreter_json['interpreterSettings'][terp_id]['dependencies']
dep_dict = {
u'groupArtifactVersion': dep,
u'local': False
}
if exclusions != None:
dep_dict["exclusions"] = exclusions
deps.append(dep_dict)
## Remove Duplicate Dependencies
seen = set()
new_deps = list()
for d in deps:
t = d.items()
if t[0] not in seen:
seen.add(t[0])
new_deps.append(d)
self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] = new_deps
def addMahoutConfig(self, terpName, mahout_home, mahout_version = "0.12.2"):
print "updating '%s' with Apache Mahout dependencies and settings" % terpName
terpDeps = ["%s/mahout-math-%s.jar" % (mahout_home, mahout_version),
"%s/mahout-math-scala_2.10-%s.jar" % (mahout_home, mahout_version)]
if "spark" in terpName.lower():
configs = {
"spark.kryo.referenceTracking": "false",
"spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
"spark.kryoserializer.buffer": "32k",
"spark.kryoserializer.buffer.max": "600m",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
}
terpDeps.append('%s/mahout-spark_2.10-%s-dependency-reduced.jar' % (mahout_home, mahout_version))
terpDeps.append("%s/mahout-spark_2.10-%s.jar" % (mahout_home, mahout_version))
terpDeps.append("%s/mahout-spark-shell_2.10-%s.jar" % (mahout_home, mahout_version))
if "flink" in terpName.lower():
configs = {
"taskmanager.numberOfTaskSlots" : "12"
}
addlDeps = [
"%s/mahout-flink_2.10-%s.jar" % (mahout_home, mahout_version),
"%s/mahout-hdfs-%s.jar" % (mahout_home, mahout_version),
"com.google.guava:guava:14.0.1"
#"%s/guava-14.0.1.jar" % mahout_home ## reuired in lib dir if running against cluster
]
for t in addlDeps:
terpDeps.append(t)
for k, v in configs.iteritems():
self._updateTerpProp(terpName, k, v)
for t in terpDeps:
self._addTerpDep(terpName, t)
#######################################################################################################################
# Need to be sure we know where Zeppelin Top directory is so we can edit conf files
#
#######################################################################################################################
def valid_zeppelin_home(path):
return isfile(path + "/bin/zeppelin-daemon.sh")
if args.zeppelin_home == None:
zeppelin_home = getcwd()
if (zeppelin_home.split("/")[-1] == "bin") and (isfile("zeppelin-daemon.sh")):
print "we're in the zeppelin/bin"
zeppelin_home = "/".join(zeppelin_home.split("/")[:-1])
print "--zeppelin_home not specified, using %s" % zeppelin_home
else:
zeppelin_home = args.zeppelin_home
if not valid_zeppelin_home(zeppelin_home):
print "%s does not appear to be a valid ZEPPELIN_HOME - e.g. the top level directory of the ZEPPELIN install" % zeppelin_home
exit(1)
else:
print "ZEPPELIN_HOME validated"
interpreter_json_path = zeppelin_home + "/conf/interpreter.json"
if not isfile(interpreter_json_path):
print "interpreter.json doesn't exist. Checking weather Zeppelin is running."
status = call(["bin/zeppelin-daemon.sh", 'status'], cwd=zeppelin_home)
if status == 1:
print "Zeppelin doesn't appear to be running- it is possible that Zeppelin has never been run (interpreter.json is created when Zeppelin is run)"
print "I'm going to try to start Zeppelin to create interpreter.json"
call(["bin/zeppelin-daemon.sh", 'start'], cwd=zeppelin_home)
from time import sleep
sleep(3)
else:
print "We're in the correct top-level directory, Zeppelin appears to be running, but there is no 'interpreter.json'. \
\nThis is a confusing case. Please try restarting Zeppelin, but if that doesn't work reach out on the mailing list."
if isfile(interpreter_json_path):
z = ZeppelinTerpWrangler(interpreter_json_path)
else:
print "'interpreter.json' not found in %s/conf" % args.zeppelin_home
exit(1)
#######################################################################################################################
# If --mahout_home not set, download and untar Mahout in to ZEPPELIN_HOME
# Set MAHOUT_HOME to ZEPPELIN_HOME/<mahout_untar_dir>
#######################################################################################################################
def download_mahout():
if args.force_download:
print "--force_download: OK, deleting existing tar if it exists."
call(["rm", "%s/%s" % (zeppelin_home, tar_name)])
return True
elif isfile("%s/%s" % (zeppelin_home, tar_name)):
print "%s found, skipping download" % tar_name
return False
elif args.mahout_home:
print "--mahout_home set, skipping download"
return False
else:
return True
if download_mahout():
check_call(['wget', mahout_bin_url], cwd= zeppelin_home)
check_call(['tar', 'xzf', tar_name], cwd= zeppelin_home)
if args.mahout_home:
mahout_home = args.mahout_home
else:
mahout_home = zeppelin_home + "/" + ".".join(tar_name.split(".")[:-2])
#######################################################################################################################
# Create new interpreters
#######################################################################################################################
z._readTerpJson()
z.createTerp("spark", "sparkMahout", args.overwrite_existing)
z.createTerp("flink", "flinkMahout", args.overwrite_existing)
z.addMahoutConfig("sparkMahout", mahout_home, mahout_version)
z.addMahoutConfig("flinkMahout", mahout_home, mahout_version)
z._writeTerpJson()
#######################################################################################################################
# Add "export MAHOUT_HOME=... to conf/zeppelin-env.sh
# Create if doesn't exist.
#######################################################################################################################
mahout_home_str = '\nexport MAHOUT_HOME=%s\n' % (mahout_home)
zeppelin_env_sh_path = '%s/conf/zeppelin-env.sh' % zeppelin_home
if isfile(zeppelin_env_sh_path):
with open(zeppelin_env_sh_path, 'rb') as f:
zeppelin_env_sh = f.readlines()
if any(["export MAHOUT_HOME=" in line for line in zeppelin_env_sh]):
print "'export MAHOUT_HOME=...' already exists in zeppelin_env.sh, not appending"
else:
print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
with open(zeppelin_env_sh_path, 'a') as f:
f.write(mahout_home_str)
else:
print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
with open(zeppelin_env_sh_path, 'wb') as f:
f.write(mahout_home_str)
#######################################################################################################################
# You have to restart Apache Zeppelin for new terps to show up... do this for user unless the specified otherwise
#
#######################################################################################################################
if not args.restart_later:
print "restarting Apache Zeppelin to load new interpreters..."
check_call(["bin/zeppelin-daemon.sh", 'restart'], cwd= zeppelin_home)
else:
print "--restart_later flag detected: remember to restart Zeppelin to see new Mahout interpreters!!"
#######################################################################################################################
# Good bye
#######################################################################################################################
print "---------------------------------------------------------------------------------------------------------------"
print "all done! Thanks for using Apache Mahout"
print "bye"