blob: 634f2be0f808f319bbd738e9c4706fb3a7021345 [file] [log] [blame]
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os, sys
#, urllib2, urllib
def cleanup(args):
cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"]
print(cmd)
ret = os.system(cmd)
print(cmd, "return", ret)
return ret
def hfile(args):
cmd = """spark-submit --class "org.apache.s2graph.loader.subscriber.TransferToHFile" \
--name "TransferToHFile@shon" \
--conf "spark.task.maxFailures=20" \
--master yarn-cluster \
--num-executors %s \
--driver-memory 1g \
--executor-memory 2g \
--executor-cores 1 \
%s \
--input %s \
--tmpPath /tmp/%s \
--zkQuorum %s \
--table %s \
--dbUrl %s \
--dbUser %s \
--dbPassword %s \
--maxHFilePerRegionServer %s \
--labelMapping %s \
--autoEdgeCreate %s""" % (args["num_executors"],
JAR,
args["input"],
args["htable_name"],
args["hbase_zk"],
args["htable_name"],
args["db_url"],
args["db_user"],
args["db_password"],
args["max_file_per_region"],
args["label_mapping"],
args["auto_create_edge"])
print(cmd)
ret = os.system(cmd)
print(cmd, "return", ret)
return ret
def distcp(args):
cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" % (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], args["htable_name"])
print(cmd)
ret = os.system(cmd)
print(cmd, "return", ret)
return ret
def chmod(args):
cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"])
print(cmd)
ret = os.system(cmd)
print(cmd, "return", ret)
return ret
def load(args):
cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s /tmp/%s %s" % \
(args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, args["htable_name"], args["htable_name"])
print(cmd)
ret = os.system(cmd)
print(cmd, "return", ret)
return ret
def send(msg):
print(msg)
def run(args):
cleanup(args)
send("[Start]: bulk loader")
ret = hfile(args)
if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret)
else: send("[Success]: loader build hfile")
# ret = distcp(args)
#
# if ret != 0: return send("[Failed]: loader distcp failed %s" % ret)
# else: send("[Success]: loader distcp")
#
# ret = chmod(args)
#
# if ret != 0: return send("[Failed]: loader chmod failed %s" % ret)
# else: send("[Success]: loader chmod")
ret = load(args)
if ret != 0: return send("[Failed]: loader complete bulkload failed %s" % ret)
else: send("[Success]: loader complete bulkload")
LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles"
JAR="loader/target/scala-2.11/s2loader-assembly-0.2.1-SNAPSHOT.jar"
args = {
"HADOOP_CONF_DIR": "hdfs_conf_gasan",
"HBASE_CONF_DIR": "hbase_conf_gasan",
"htable_name": "test",
"hbase_namenode": "hdfs://localhost:8020",
"hbase_zk": "localhost",
"db_url": "jdbc:mysql://localhost:3306/graph_dev",
"db_user": "sa",
"db_password": "sa",
"max_file_per_region": 1,
"label_mapping": "none",
"auto_create_edge": "false",
"-m": 1,
"-bandwidth": 10,
"num_executors": 2,
"input": "/tmp/test.txt"
}
run(args)