blob: f21d3a82c99789c902b94923826e3d0a5946765d [file] [log] [blame]
#!/usr/bin/env python3
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
thrift_server_conf = dict()
spark_default_conf = dict()
thrift_server_conf_path = "/etc/taihao-apps/spark-conf/spark-thriftserver.conf"
spark_default_conf_path = "/etc/taihao-apps/spark-conf/spark-defaults.conf"
hdfs_conf_path = "/etc/taihao-apps/hadoop-conf/core-site.xml"
def get_hdfs_root_path():
cf = open(hdfs_conf_path)
line = cf.readline()
hdfs_root = ""
while line:
lp = line.strip()
if "hdfs://" in lp:
hdfs_root = lp.replace("<value>", "").replace("</value>", "").strip()
break
line = cf.readline()
cf.close()
return hdfs_root
def read_conf(conf, file):
cf = open(file)
line = cf.readline()
while line:
lps = line.strip()
if lps.startswith("#") or not lps:
line = cf.readline()
continue
splits = lps.split()
key = splits[0]
value = splits[1]
conf[key] = value
line = cf.readline()
cf.close()
def read_ts_conf():
read_conf(thrift_server_conf, thrift_server_conf_path)
def read_dconf():
read_conf(spark_default_conf, spark_default_conf_path)
def save_confs(file, conf):
for (key, value) in conf.items():
file.writelines(key + " " + value + "\n")
file.close()
def set_common_spark_confs(conf):
conf["spark.eventLog.enabled"] = "true"
conf["spark.serializer"] = "org.apache.spark.serializer.KryoSerializer"
conf["spark.executor.memoryOverhead"] = "4g"
conf["spark.dynamicAllocation.enabled"] = "false"
conf["spark.executor.instances"] = "350"
conf["spark.sql.adaptive.enabled"] = "true"
conf["spark.sql.adaptive.skewJoin.enabled"] = "true"
conf["spark.shuffle.service.enabled"] = "true"
conf["spark.sql.adaptive.localShuffleReader.enabled"] = "false"
conf["spark.sql.adaptive.coalescePartitions.initialPartitionNum"] = "4096"
conf["spark.sql.hive.forceRpadString"] = "true"
if "spark.eventLog.dir" in spark_default_conf:
conf["spark.eventLog.dir"] = spark_default_conf["spark.eventLog.dir"]
def set_skew_join_confs(conf):
conf["spark.sql.adaptive.autoBroadcastJoinThreshold"] = "-1"
conf["spark.sql.adaptive.skewJoin.enabled"] = "true"
conf["spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes"] = "32m"
conf["spark.sql.autoBroadcastJoinThreshold"] = "-1"
def set_celeborn_confs(conf, replicate=False):
conf["spark.shuffle.manager"] = "org.apache.spark.shuffle.celeborn.SparkShuffleManager"
conf["spark.serializer"] = "org.apache.spark.serializer.KryoSerializer"
conf["spark.celeborn.master.endpoints"] = "master-1-1:9097"
conf["spark.shuffle.service.enabled"] = "false"
if replicate:
conf["spark.celeborn.client.push.replicate.enabled"] = "true"
else:
conf["spark.celeborn.client.push.replicate.enabled"] = "false"
def save_ess_conf(dir):
ess = open(dir + "/spark-ess.conf", "w")
nconf = thrift_server_conf.copy()
set_common_spark_confs(nconf)
save_confs(ess, nconf)
def save_celeborn_conf(dir):
celeborn = open(dir + "/spark-celeborn.conf", "w")
nconf = thrift_server_conf.copy()
set_common_spark_confs(nconf)
set_celeborn_confs(nconf, False)
save_confs(celeborn, nconf)
def save_celeborn_dup_conf(dir):
celeborndup = open(dir + "/spark-celeborn-dup.conf", "w")
nconf = thrift_server_conf.copy()
set_common_spark_confs(nconf)
set_celeborn_confs(nconf, True)
save_confs(celeborndup, nconf)
def save_skewjoin_spark_confs(dir):
celeborndup = open(dir + "/spark-skewjoin.conf", "w")
nconf = thrift_server_conf.copy()
set_common_spark_confs(nconf)
set_skew_join_confs(nconf)
set_celeborn_confs(nconf, True)
save_confs(celeborndup, nconf)
def update_spark_confs(target_dir):
print("Generated configuration output path:" + target_dir)
print("generate new spark confs")
read_ts_conf()
read_dconf()
save_ess_conf(target_dir)
save_celeborn_conf(target_dir)
save_celeborn_dup_conf(target_dir)
save_skewjoin_spark_confs(target_dir)
def merge_two_dicts(x, y):
z = x.copy()
z.update(y)
return z
def update_hibench_confs(hiben_conf_dir):
hdfs_root_path = get_hdfs_root_path()
hibench_hadoop_conf_file_path = hiben_conf_dir + "/hadoop.conf"
hibench_hadoop_conf = dict()
read_conf(hibench_hadoop_conf, hibench_hadoop_conf_file_path)
hibench_hadoop_conf["hibench.hdfs.master"] = hdfs_root_path
hibench_hadoop_conf_file_instance = open(hibench_hadoop_conf_file_path, "w")
save_confs(hibench_hadoop_conf_file_instance, hibench_hadoop_conf)
hibench_hadoop_conf_file_instance.close()
hibench_spark_conf_file = hiben_conf_dir + "/spark.conf"
hibench_spark_conf = dict()
read_conf(hibench_spark_conf, hibench_spark_conf_file)
nconf = thrift_server_conf.copy()
set_common_spark_confs(nconf)
set_celeborn_confs(nconf, True)
new_hibench_spark_conf = merge_two_dicts(nconf, hibench_spark_conf)
normal_hibench_spark_conf_file_path = hibench_spark_conf_file + ".celeborn"
save_confs(open(normal_hibench_spark_conf_file_path, "w"), new_hibench_spark_conf)
splits_hibench_spark_conf_file_path = hibench_spark_conf_file + ".split"
new_hibench_spark_conf["spark.celeborn.client.shuffle.partitionSplit.threshold"] = "16m"
save_confs(open(splits_hibench_spark_conf_file_path, "w"), new_hibench_spark_conf)
if __name__ == '__main__':
if len(sys.argv) <= 1:
print("Need regression root path")
sys.exit(-1)
rootDir = sys.argv[1]
targetDir = rootDir + "/conf"
hibenConfDir = rootDir + "/hibench3/conf"
update_spark_confs(targetDir)
update_hibench_confs(hibenConfDir)