| #!/usr/bin/env python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| try: |
| import os, sys, re |
| from optparse import Option, OptionParser |
| from hawqpylib.hawqlib import HawqXMLParser, parse_hosts_file, remove_property_xml, update_xml_property, local_ssh, remote_ssh |
| from gppylib.commands.unix import getLocalHostname, getUserName |
| from gppylib.db import dbconn |
| from pygresql.pg import DatabaseError |
| from gppylib.gplog import setup_hawq_tool_logging, quiet_stdout_logging, enable_verbose_logging |
| from gppylib.gpsqlUtil import GpsqlUtil |
| except ImportError, e: |
| sys.exit('ERROR: Cannot import modules. Please check that you ' |
| 'have sourced greenplum_path.sh. Detail: ' + str(e)) |
| |
| logger, log_filename = setup_hawq_tool_logging('hawq_config',getLocalHostname(),getUserName()) |
| |
| def parseargs(): |
| parser = OptionParser(usage="HAWQ backup options.") |
| parser.add_option('-f', '--full', action='store_true', |
| help="Backup HAWQ Fully.") |
| parser.add_option('-i', '--incremental', action='store_true', |
| help="Backup HAWQ Incrementally.") |
| parser.add_option('-n', '--snapshot', type='string', |
| help="Backup Snapshot Name.") |
| parser.add_option('-s', '--source_snapshot', type='string', |
| help="Source Backup Snapshot Name.") |
| parser.add_option('-t', '--target_snapshot', type='string', |
| help="Target Backup Snapshot Name.") |
| parser.add_option('-m', '--target_mdd', type='string', |
| help="Target Backup Master Data Directory.") |
| parser.add_option('-d', '--target_hdfs_url', type='string', |
| help="Target Backup HDFS URL.") |
| parser.add_option('-a', '--active_target_host', type='string', |
| help="Active Target Host.") |
| |
| (options, args) = parser.parse_args() |
| |
| if options.full and options.incremental: |
| logger.error("Multiple actions specified. See the --help info.") |
| parser.exit() |
| |
| return (options, args) |
| |
| def checkpoint_databases(hawq_site): |
| logger.info("Doing OushuDB checkpoints ...") |
| database_port = hawq_site.hawq_dict['hawq_master_address_port'] |
| database_name = "template1" |
| database_up = True |
| try: |
| dburl = dbconn.DbURL(port=database_port, dbname=database_name) |
| conn = dbconn.connect(dburl, True) |
| query = "select datname from pg_database where datname not in ('template1', 'template0', 'hcatalog') order by datname;" |
| rows = dbconn.execSQL(conn, query) |
| conn.close() |
| except DatabaseError, ex: |
| database_up = False |
| |
| if database_up: |
| for row in rows: |
| database_name = row[0] |
| try: |
| dburl = dbconn.DbURL(port=database_port, dbname=database_name); |
| conn = dbconn.connect(dburl, True) |
| query = "CHECKPOINT;" |
| dbconn.execSQL(conn, query) |
| conn.close() |
| except DatabaseError, ex: |
| logger.error("Fail to connect to database to do checkpoint") |
| sys.exit(1) |
| logger.info("OushuDB is online: do checkpoint ...") |
| else: |
| logger.info("OushuDB is offline: Skip doing checkpoints ...") |
| |
| logger.info("Done with database checkpoints") |
| |
| |
| def backup_metadata(hawq_site, snapshot, target_mdd): |
| logger.info("Backing up metadata ...") |
| mdd = hawq_site.hawq_dict['hawq_master_directory'] |
| |
| cmd = "mkdir -p %s" % (target_mdd) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to create target master data directory") |
| sys.exit(1) |
| |
| mdd_leaf = mdd.strip().strip("/").split("/")[-1] |
| cmd = "cd %s/..; tar --exclude='%s/pg_log' -czf %s.tar.gz %s; mv %s.tar.gz %s" % (mdd, mdd_leaf, snapshot, mdd_leaf, snapshot, target_mdd) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to back up master data directory") |
| sys.exit(1) |
| logger.info("Done with metadata backup") |
| |
| def backup_fully(hawq_site, snapshot, target_mdd, target_hdfs_url, active_target_host): |
| # 1) do database checkpoint |
| checkpoint_databases(hawq_site) |
| |
| # 2) backup master data directory |
| backup_metadata(hawq_site, snapshot, target_mdd) |
| |
| # 3) backup HDFS data |
| logger.info("Backing up user data in full mode ...") |
| source_hdfs_url = "hdfs://" + hawq_site.hawq_dict['hawq_dfs_url'] + "/" |
| p1 = 'hdfs://(?P<host>[^:/ ]+):(?P<port>[0-9]*).*' |
| p2 = 'hdfs://(?P<name>[^:/ ]+)/.*' |
| m1 = re.search(p1, target_hdfs_url) |
| m2 = re.search(p2, target_hdfs_url) |
| isHaCluster = False |
| if m1 is None and m2 is None: |
| logger.error("Invalid URL for target HDFS cluster") |
| sys.exit(1) |
| elif m1 is not None and m2 is None: |
| host = m1.group('host') |
| port = m1.group('port') |
| active_target_host = host |
| elif m1 is None and m2 is not None: |
| is_ha_cluster = True |
| name_service = m2.group('name') |
| else: |
| logger.error("The URL for target HDFS cluster can be identified as Non-HA and HA") |
| sys.exit(1) |
| |
| cmd = "hdfs dfsadmin -allowSnapshot %s" % (source_hdfs_url) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to enable snapshot on source HDFS cluster") |
| sys.exit(1) |
| |
| cmd = "hdfs dfs -createSnapshot %s %s" % (source_hdfs_url, snapshot) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to create snapshot on source HDFS cluster") |
| sys.exit(1) |
| |
| if osenviron['HADOOP_HOME'] is None: |
| logger.error("Failed to get Hadoop home directory") |
| sys.exit(1) |
| |
| cmd = "%s/bin/hadoop distcp %s/.snapshot/%s %s" % (os.environ['HADOOP_HOME'], source_hdfs_url, snapshot, target_hdfs_url) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to copy user data from source to target HDFS cluster") |
| sys.exit(1) |
| |
| cmd = "hdfs dfsadmin -allowSnapshot %s" % (target_hdfs_url) |
| result = remote_ssh(cmd, active_target_host, "") |
| if result != 0: |
| logger.error("Failed to enable snapshot on target HDFS cluster") |
| sys.exit(1) |
| |
| cmd = "hdfs dfs -createSnapshot %s %s" % (target_hdfs_url, snapshot) |
| result = remote_ssh(cmd, active_target_host, "") |
| if result != 0: |
| logger.error("Failed to create snapshot on target HDFS cluster") |
| sys.exit(1) |
| |
| logger.info("Done with user data backup in full mode") |
| logger.info("Backup OushuDB metadata and user data successfully.") |
| |
| |
| def backup_incrementally(hawq_site, source_snapshot, target_snapshot, target_mdd, target_hdfs_url, active_target_host): |
| # 1) do database checkpoint |
| checkpoint_databases(hawq_site) |
| |
| # 2) backup master data directory |
| backup_metadata(hawq_site, target_snapshot, target_mdd) |
| |
| # 3) backup HDFS data |
| logger.info("Backing up user data in incremental mode ...") |
| source_hdfs_url = "hdfs://" + hawq_site.hawq_dict['hawq_dfs_url'] + "/" |
| p1 = 'hdfs://(?P<host>[^:/ ]+):(?P<port>[0-9]*).*' |
| p2 = 'hdfs://(?P<name>[^:/ ]+)/.*' |
| m1 = re.search(p1, target_hdfs_url) |
| m2 = re.search(p2, target_hdfs_url) |
| isHaCluster = False |
| if m1 is None and m2 is None: |
| logger.error("Invalid URL for target HDFS cluster") |
| sys.exit(1) |
| elif m1 is not None and m2 is None: |
| host = m1.group('host') |
| port = m1.group('port') |
| active_target_host = host |
| elif m1 is None and m2 is not None: |
| is_ha_cluster = True |
| name_service = m2.group('name') |
| else: |
| logger.error("The URL for target HDFS cluster can be identified as Non-HA and HA") |
| sys.exit(1) |
| |
| cmd = "hdfs dfsadmin -allowSnapshot %s" % (source_hdfs_url) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to enable snapshot on source HDFS cluster") |
| sys.exit(1) |
| |
| cmd = "hdfs dfs -createSnapshot %s %s" % (source_hdfs_url, target_snapshot) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to create snapshot on source HDFS cluster") |
| sys.exit(1) |
| |
| if osenviron['HADOOP_HOME'] is None: |
| logger.error("Failed to get Hadoop home directory") |
| sys.exit(1) |
| |
| cmd = "%s/bin/hadoop distcp -update -diff %s %s %s %s" % (os.environ['HADOOP_HOME'], source_snapshot, target_snapshot, source_hdfs_url, target_hdfs_url) |
| result = local_ssh(cmd, logger) |
| if result != 0: |
| logger.error("Failed to copy data changes from source to target HDFS cluster") |
| sys.exit(1) |
| |
| cmd = "hdfs dfsadmin -allowSnapshot %s" % (target_hdfs_url) |
| result = remote_ssh(cmd, active_target_host, "") |
| if result != 0: |
| logger.error("Failed to enable snapshot on target HDFS cluster") |
| sys.exit(1) |
| |
| cmd = "hdfs dfs -createSnapshot %s %s" % (target_hdfs_url, target_snapshot) |
| result = remote_ssh(cmd, active_target_host, "") |
| if result != 0: |
| logger.error("Failed to create snapshot on target HDFS cluster") |
| sys.exit(1) |
| |
| logger.info("Done with user data backup in incremental mode") |
| logger.info("Backup OushuDB metadata and user data successfully.") |
| |
| |
| |
| if __name__ == '__main__': |
| options, args = parseargs() |
| |
| GPHOME = os.getenv('GPHOME') |
| if not GPHOME: |
| sys.exit("GPHOME environment variable not set, plese set it") |
| hawq_site = HawqXMLParser(GPHOME) |
| hawq_site.get_all_values() |
| |
| if options.full: |
| backup_fully(hawq_site, options.snapshot, options.target_mdd, options.target_hdfs_url, options.active_target_host) |
| elif options.incremental: |
| backup_incrementally(hawq_site, options.source_snapshot, options.target_snapshot, options.target_mdd, options.target_hdfs_url, options.active_target_host) |
| else: |
| logger.error("Please input correct options for hawq backup") |
| sys.exit(1) |