| #!/usr/bin/python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os, sys |
| #, urllib2, urllib |
| |
| def cleanup(args): |
| cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"] |
| print(cmd) |
| ret = os.system(cmd) |
| print(cmd, "return", ret) |
| return ret |
| |
| def hfile(args): |
| print(args) |
| cmd = """HADOOP_CONF_DIR=%s spark-submit --class "org.apache.s2graph.s2jobs.loader.GraphFileGenerator" \ |
| --name "GraphFileGenerator@shon" \ |
| --conf "spark.task.maxFailures=20" \ |
| --conf "spark.executor.extraClassPath=%s" \ |
| --conf "spark.driver.extraClassPath=%s" \ |
| --jars %s \ |
| --master local[2] \ |
| --deploy-mode client \ |
| --num-executors %s \ |
| --driver-memory 1g \ |
| --executor-memory 2g \ |
| --executor-cores 1 \ |
| %s \ |
| --input %s \ |
| --tempDir %s \ |
| --output %s \ |
| --zkQuorum %s \ |
| --table %s \ |
| --dbUrl '%s' \ |
| --dbUser %s \ |
| --dbPassword %s \ |
| --dbDriver %s \ |
| --method SPARK \ |
| --maxHFilePerRegionServer %s \ |
| --labelMapping %s \ |
| --autoEdgeCreate %s""" % (args["HADOOP_CONF_DIR"], |
| MYSQL_JAR, |
| MYSQL_JAR, |
| MYSQL_JAR, |
| args["num_executors"], |
| JAR, |
| args["input"], |
| args["tempDir"], |
| args["output"], |
| args["hbase_zk"], |
| args["htable_name"], |
| args["db_url"], |
| args["db_user"], |
| args["db_password"], |
| args["db_driver"], |
| args["max_file_per_region"], |
| args["label_mapping"], |
| args["auto_create_edge"]) |
| print(cmd) |
| ret = os.system(cmd) |
| print(cmd, "return", ret) |
| return ret |
| |
| def distcp(args): |
| cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" % (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], args["htable_name"]) |
| print(cmd) |
| ret = os.system(cmd) |
| print(cmd, "return", ret) |
| return ret |
| |
| def chmod(args): |
| cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"]) |
| print(cmd) |
| ret = os.system(cmd) |
| print(cmd, "return", ret) |
| return ret |
| |
| def load(args): |
| cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s %s %s" % \ |
| (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, args["output"], args["htable_name"]) |
| print(cmd) |
| ret = os.system(cmd) |
| print(cmd, "return", ret) |
| return ret |
| |
| def send(msg): |
| print(msg) |
| |
| def run(args): |
| cleanup(args) |
| send("[Start]: bulk loader") |
| ret = hfile(args) |
| |
| # if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret) |
| # else: send("[Success]: loader build hfile") |
| |
| # ret = distcp(args) |
| # |
| # if ret != 0: return send("[Failed]: loader distcp failed %s" % ret) |
| # else: send("[Success]: loader distcp") |
| # |
| # ret = chmod(args) |
| # |
| # if ret != 0: return send("[Failed]: loader chmod failed %s" % ret) |
| # else: send("[Success]: loader chmod") |
| |
| ret = load(args) |
| |
| # if ret != 0: return send("[Failed]: loader complete bulkload failed %s" % ret) |
| # else: send("[Success]: loader complete bulkload") |
| |
| |
| LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles" |
| JAR="s2jobs/target/scala-2.11/s2jobs-assembly-0.2.1-SNAPSHOT.jar" |
| MYSQL_JAR="/Users/shon/Downloads/mysql-connector-java-5.1.28.jar" |
| MYSQL_CLASSPATH="/Users/shon/Downloads/mysql-connector-java-5.1.28.jar" |
| DB_DRIVER="com.mysql.jdbc.Driver" |
| DB_URL="jdbc:mysql://localhost:3306/graph_dev" |
| # DB_URL="jdbc:h2:file:./var/metastore;MODE=MYSQL" |
| args = { |
| "HADOOP_CONF_DIR": "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop", |
| "HBASE_CONF_DIR": "/usr/local/Cellar/hbase/1.2.6/libexec/conf", |
| "htable_name": "s2graph", |
| "hbase_namenode": "hdfs://localhost:8020", |
| "hbase_zk": "localhost", |
| "db_driver": DB_DRIVER, |
| "db_url": DB_URL, |
| "db_user": "graph", |
| "db_password": "graph", |
| "max_file_per_region": 1, |
| "label_mapping": "none", |
| "auto_create_edge": "false", |
| "-m": 1, |
| "-bandwidth": 10, |
| "num_executors": 2, |
| "input": "/tmp/imei-20.txt", |
| "tempDir": "/tmp/bulkload_tmp", |
| "output": "/tmp/bulkload_output" |
| } |
| |
| run(args) |