blob: 61dd4233a1ba82a4b12711ae4330dd43fb636744 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''
hawqfilespace [options]
General Options:
-? | --help show this help message and exit
-v | --version show the program's version number and exit
-l | --logdir DIRECTORY log file directory
Connection Options:
-h | --host HOSTNAME database server host
-p | --port PORT database server port
-U | --username NAME database user name
-W | --password prompt for password
Execution Options:
-c | --config FILE configuration file
-o | --output DIRECTORY output directory for config file
Move Options:
--movefilespace FILESPACE | default move filespace to new location on distributed file system
--location <dfslocation> new location which the filespace will be moved to
'''
# ============================================================================
__version__ = '$Revision$'
# ============================================================================
import sys, os
EXECNAME = os.path.split(__file__)[-1]
# ============================================================================
# Python version 2.6.2 is expected, must be between 2.5-3.0
# ============================================================================
if sys.version_info < (2, 5, 0) or sys.version_info >= (3, 0, 0):
sys.stderr.write("""
Error: %s is supported on Python versions 2.5 or greater
Please upgrade python installed on this machine.
""" % EXECNAME)
sys.exit(1)
# ============================================================================
import optparse # Option Parsing
import subprocess # Calling external processes
import traceback # exception logging
import re
import commands
from time import strftime
try:
from gppylib.commands import unix
from gppylib.db import dbconn
from gppylib.db import catalog
from pygresql import pg # Database interaction
from gppylib.gplog import * # Greenplum logging facility
from getpass import getpass
from gppylib.parseutils import line_reader, parse_fspacename, parse_dfs_url, parse_fspacesys, parse_fspacereplica, parse_gpfilespace_line, \
canonicalize_address
from gppylib.operations.filespace import MoveFilespaceError, MoveFileSpaceLocation, CheckSuperUser
except ImportError, e:
sys.exit('Error: unable to import module: ' + str(e))
# ============================================================================
class GPError(StandardError): pass
class GPDatabaseError(GPError): pass
class GPPrivilegeError(GPError): pass
CLUSTERSIZE = 1 # DEBUGGING (to simulate ebay set to 98)
DEFAULT_NUM_WORKERS = 64 # same as gpstart, used for hostcache
# ============================================================================
def cli_help():
'''
Reads the help file from the docs directory, if documentation file can't
be found this defaults to the __doc__ string.
'''
help_path = os.path.join(sys.path[0], '../docs/cli_help',
EXECNAME + '_help')
f = None
try:
try:
f = open(help_path)
return f.read(-1)
except:
return __doc__
finally:
if f:
f.close()
def usage():
print cli_help()
# ============================================================================
def ParseOptions():
'''
Parses command line options input to the script.
'''
# Determine Default Values
gphome = os.environ.get("GPHOME")
home = os.environ.get("HOME")
logdir = os.path.join(home, "hawqAdminLogs")
pguser = os.environ.get("PGUSER") or unix.getUserName()
pghost = os.environ.get("PGHOST") or unix.getLocalHostname()
pgport = os.environ.get("PGPORT") or 5432
filesystem = 'hdfs'
output = os.getcwd()
# Setup Options Parser
parser = optparse.OptionParser(usage=cli_help(), add_help_option=False)
parser.add_option('--version', default=False,
action='store_true', help=optparse.SUPPRESS_HELP)
parser.add_option('-l', '--logdir', default=logdir,
help=optparse.SUPPRESS_HELP)
parser.add_option('-h', '--host', default=pghost,
help=optparse.SUPPRESS_HELP)
parser.add_option('-?', '--help', default=False,
action='store_true', help=optparse.SUPPRESS_HELP)
parser.add_option('-p', '--port', default=pgport,
help=optparse.SUPPRESS_HELP)
parser.add_option('-U', '--username', default=pguser,
help=optparse.SUPPRESS_HELP)
parser.add_option('-W', '--password', default=False,
action='store_true', help=optparse.SUPPRESS_HELP)
parser.add_option('-c', '--config', default=None,
help=optparse.SUPPRESS_HELP)
parser.add_option('-o', '--output', default=output,
help=optparse.SUPPRESS_HELP)
parser.add_option('-v', '--verbose',
action='store_true', help=optparse.SUPPRESS_HELP)
parser.add_option('-a', dest='interactive',
action='store_false', help=optparse.SUPPRESS_HELP)
parser.add_option('-q', '--quiet',
action='store_true', help=optparse.SUPPRESS_HELP)
parser.add_option('--movefilespace')
parser.add_option('--location')
(options, args) = parser.parse_args()
# Print version and exit
if options.version:
print EXECNAME + ' ' + __version__
sys.exit(0)
# Print help and exit
if options.help:
usage()
sys.exit(0)
# We don't support any arguments not listed above
if len(args) > 0:
parser.error("unknown arguments %s" % repr(args))
# Make sure that port is an integer
# We check this here rather than in parser.add_option because the default
# value from getenv() might fail this check, and OptionParser doesn't
# like recieving bad default values
try:
options.port = int(options.port)
except:
logger.error("Invalid PORT: '%s'" % options.port)
sys.exit(1)
return options
# ============================================================================
def getstring(prompt, question=False, default=""):
'''
getstring(prompt, question, default)
Prompts the user for input, returns the value read from STDIN.
'''
# test that we have an interactive terminal
if not sys.stdin.isatty():
logger.error('stdin is not a tty')
logger.error('hawqfilespace must be run from an interactive terminal')
sys.exit(1)
# Give the user the prompt and read from STDIN
try:
if question:
if default:
print "%s (default=%s)" % (question, default)
else:
print question
value = raw_input(prompt).strip()
if value == "":
value = default
return value
# Control-C or Control-D
except (KeyboardInterrupt, EOFError), e:
print
raise KeyboardInterrupt()
# Some other exception
except Exception, e:
logger.error('failure reading from stdin')
logger.error(repr(e))
sys.exit(1)
# ============================================================================
# ============================================================================
def getbool(prompt, question, default):
'''
getbool(prompt, question, default)
Prompts the user for input (yes|no), returns True/False
'''
while True:
value = getstring(prompt, question, default).lower()
if value in ("y", "ye", "yes"):
return True
elif value in ("n", "no"):
return False
def getdir(prompt, hosts=[], primary=None, shared=False, fsysn=None, db=None):
'''
getdir(prompt, hosts)
Prompts the user for input, checks that the input is a full path and
validates that it exists on at least a subset of the specified hosts.
Since we will be entering a number of directories and since checking them
can be expensive on large clusters we do NOT exhaustively check all the
hosts on input. The checking perfomed here is merely a sanity check.
We will perform more exhaustive checking after all paths are known.
'''
while True:
value = getstring(prompt)
if len(value) == 0:
continue
if shared:
# TODO: check dfs uri
try:
cursor = dbconn.execSQL(db, "show enable_secure_filesystem")
db.commit() # Should move into execSQL ?
rows = cursor.fetchall()
krbstatus = rows[0][0]
cursor = dbconn.execSQL(db, "show krb_srvname")
db.commit()
rows = cursor.fetchall();
krb_srvname = rows[0][0]
cursor = dbconn.execSQL(db, "show krb_server_keyfile")
db.commit()
rows = cursor.fetchall();
krb_keyfile = rows[0][0]
except Exception, e:
raise GPDatabaseError(str(e))
dfs_name = fsysn
dfs_url = value
command = "gpcheckhdfs %s %s %s %s %s" % (dfs_name, dfs_url, krbstatus, krb_srvname, krb_keyfile)
(status,returnCode) = commands.getstatusoutput(command)
if status != 0:
print command
print "gpcheckhdfs error code is %s" % status
if 1 == status:
print "gpcheckhdfs run error,Please check: %s" % (command)
continue
elif 100 == status:
print "Failed to Connect to %s://%s" % (dfs_name, dfs_url)
continue
elif 101 == status:
print "Failed to Open or Write a File in HDFS"
continue
elif 102 == status:
print "Failed to Write File to HDFS"
continue
elif 103 == status:
print "Failed to Flush,Check your Datanode"
continue
elif 104 == status:
print "Failed to Delete a File."
continue
elif 105 == status:
print "DFS Directory Error"
continue
elif 106 == status:
print "Get Delegation Token Error"
continue
elif 107 == status:
print "Failed to Login to Kerberos Server"
continue
return value
if not os.path.isabs(value):
print "Must specify a full path"
continue
value = os.path.normpath(value)
return value
# Helper function to validate reasonable names for filespaces
def isValidIdentifier(name):
if name == "":
return False
if name[0] == '"' and name[-1] == '"':
quoted = True
name = name[1:-1]
else:
quoted = False
if name[0:3] == 'pg_' or name[0:4] == '"pg_':
print "[Error] The name prefix 'pg_' is reserved for " \
"system filespaces\n"
return False
if name[0:3] == 'gp_' or name[0:4] == '"gp_':
print "[Error] The name prefix 'gp_' is reserved for " \
"system filespaces\n"
return False
# Check for identifier names
if quoted:
regex = r'([^"]*("")*)+$'
else:
regex = r'[a-zA-Z][a-zA-Z0-9_]*$'
if not re.match(regex, name):
if quoted:
print "[Error] invalid identifier\n"
else:
print "[Error] invalid identifier"
print " [Hint] non-alphanumeric identifers should be double-" \
"quoted\n"
return False
return True
# ============================================================================
class GPFilespace:
'''
Greenplum Filespace Management Utility
'''
# --------------------------------------------------------------------
def __init__(self, options):
'''
GPFilespace(options)
Creates an instance of a GPFilespace with the parsed options
This will:
- setup the basic container object
- establish a connection to the database
- assert that the user is a database superuser
- gather the basic information from gp_configuration
It will not:
- resolve gp_configuration.hostname to real hostnames
- group segments to their hosts
- identify host configuration groups
These steps are omitted because they require probing the
network, which can be expensive an large clusters. We will
eventually need to bite the bullet and perform some of those
tests, but if there are other errors that crop up before we
need that it is better not to have wasted time.
Do not fear, the class is still fully functional. The above
information will be fetched the first time it is required.
Usage:
options = ParseOptions()
X = GPFilespace(options)
'''
self.options = options
self.fspacename = None # name for the filespace
self.dir = options.output
self.master = None # [] list of master segments
self.segments = None # {} dbid => GpDB resolution
self.hosts = None # {} hostname => GPHost resolution
self.nics = None # {} nic => hostname resolution
self.config = None # {} host configuration groups
self.db = None # pygresql.pgobject
self.dburl = None
self.pool = None
self.array = None
self.hostcache = None
self.default_fsysname = 'hdfs'
self.default_fsreplica = 3
def Setup(self):
'''
GPFilespace::Setup()
Handle initial setup, including spawning the threadpool.
Once Setup() has been called it is imperitive that cleanup() be
called on termination.
'''
self.connect()
self.get_configuration()
#self.resolve_hostnames()
# --------------------------------------------------------------------
def cleanup(self):
'''
GPFilespace::cleanup()
Close all active connections, shutdown threads, etc.
'''
if self.pool:
logger.debug('halting threads')
self.pool.haltWork()
self.pool = None
self.close_conn()
# --------------------------------------------------------------------
def connect(self):
'''
GPFilespace::connect()
Establishes a connection to the database as specified in the options.
Checks that the database user is a database superuser.
throws:
GPDatabaseError - failure communicating with the database
GPPrivilegeError - database user is not a database superuser
Usage:
X = GPFilespace(options)
connection = X.connect()
'''
# If we are already connected simply return
if self.db:
return
try:
user = self.options.username
host = self.options.host
port = self.options.port
db = 'template1'
if self.options.password:
password = getpass("Password: ")
else:
password = None
dburl = dbconn.DbURL(username=user, hostname=host,
port=port, dbname=db, password=password)
conn = dbconn.connect(dburl)
q = "SELECT usesuper FROM pg_user where usename = user"
rows = catalog.basicSQLExec(conn, q)
is_super = rows[0][0]
except Exception, e:
raise GPDatabaseError(str(e))
if not is_super:
error = "hawqfilespace requires database superuser privileges"
raise GPPrivilegeError(error)
self.dburl = dburl
self.db = conn
# --------------------------------------------------------------------
def close_conn(self):
if self.db:
logger.debug('closing database connection')
self.db.close()
self.db = None
# --------------------------------------------------------------------
def get_configuration(self):
'''
GPFilespace::get_configuration()
Extracts gp_configuration from the database and uses it to populate:
self.segments = hash of segments {dbid: GpDB()}
self.master = list of master segments [GpDB()]
If any existing configuration groups or host lists are present they
will be obliterated, but we will keep any existing nic->hostname
mappings.
Usage:
X = GPFilespace(options)
X.get_configuration()
'''
# We should already have a connection, but if not establish one.
self.connect()
# These have pointers to the old GpDB so they must be destroyed
# before we populate the new structures.
self.master = []
self.segments = {}
self.hosts = None
self.config = None
query = "select * from gp_segment_configuration;"
rows = dbconn.execSQL(self.db, query)
segment_hosts = []
if rows:
for row in rows:
if row[1] == 'm':
master_host = row[4]
elif row[1] == 's':
standby_host = row[4]
else:
segment_hosts.append(row[4])
else:
print "No host exist."
sys.exit(1)
# --------------------------------------------------------------------
def generate_config(self):
'''
generate_config()
This function coordinates user inputs into the generation of a
filespace configuration file. It reads in paths from STDIN, performs
so simple sanity checks on them and produces a config file.
It does NOT exhaustively check all hosts that the directories are
valid, but it does check a small sample of hosts. This is on the
assumption that the segments will have been maintained together
and that validating can be expensive on large clusters. A more
exhaustive check will be performed when a user attempts to actually
create the filespace using the generated config file.
'''
# determine output path location based on input directory.
filename = "hawqfilespace_config_" + strftime("%Y%m%d_%H%M%S")
if not os.access(self.dir, os.F_OK):
(self.dir, filename) = os.path.split(self.dir)
if len(self.dir) == 0:
self.dir = os.getcwd()
if not os.access(self.dir, os.F_OK):
logger.error('No such file or directory: "%s"' % self.dir)
logger.error('Unable to create config file')
sys.exit(1)
if not os.access(self.dir, os.W_OK):
logger.error('Directory not writeable: "%s"' % self.dir)
logger.error('Unable to create config file')
sys.exit(1)
filename = os.path.join(self.dir, filename)
# Prompt for the filespace name:
fspacename = ""
question = "Enter a name for this filespace"
while not isValidIdentifier(fspacename):
fspacename = getstring("> ", question)
logger.debug("Filespace Name: %s" % fspacename)
# fsysname and fsreplica can be skipped here, this will let us read the
# old config file easily.
# get replica number.
fsreplica = -1
question = "Enter replica num for filespace. If 0, default replica num is used"
while fsreplica < 0:
repstr = getstring("> ", question, "3")
try:
fsreplica = int(repstr)
except (ValueError), e:
fsreplica = -1
if fsreplica < 0:
print "replica num must be a positive integer or 0\n"
logger.debug("Filespace Replica Num: %d" % fsreplica)
print "\nPlease specify the DFS location for the filespace (for example: localhost:9000/fs)"
prompt = "location> "
dir = getdir(prompt, hosts=[], primary=True, shared=True, fsysn='hdfs', db=self.db)
logger.debug("%s %s" % (prompt, dir))
logger.info("[created]")
logger.info("""
To add this filespace to the database please run the command:
hawqfilespace --config %s
""" % filename)
f = open(filename, 'w')
f.write('filespace:%s\n' % fspacename)
f.write('fsreplica:%d\n' % fsreplica)
f.write('dfs_url::%s' % dir)
f.close
return None
# --------------------------------------------------------------------
def execute_config(self):
'''
execute_config()
Reads a configuration file from the filesystem and attempts to create
the specified filespace within the database.
This first involves checking that we expect the create filespace
command to succeed. This is an important step since a failure of a
mirror to create the filespace may result in the mirror failing
completely.
'''
# Read the configuration file passed as input (-c filename)
try:
print "Reading Configuration file: '%s'" % self.options.config
config = open(self.options.config)
except IOError, e:
raise GPError(str(e))
# First line in file is the filespace name, remaining lines are
# specify hostname, dbid, and a path:
#
# filespace:name
# [fsysname:type]
# [fsreplica:replica]
# hostname:dbid:path
# ...
fspacename = None
fsysname = 'hdfs'
fsreplica = None
dfs_url = None
for lineno, line in line_reader(config):
processed = False
if fspacename is None:
fspacename = parse_fspacename(self.options.config, lineno, line)
if not isValidIdentifier(fspacename):
raise GPError('Invalid filespace name: %s' % fspacename)
continue
# From here are the options may not be appeared in the config file,
# process these options need to be carefully.
#
# If the expected option is not in the config file, do not continue
# to process next line.
if fsreplica is None:
repstr = parse_fspacereplica(self.options.config, lineno, line)
if repstr is None:
logger.warn("no fsreplica at line %d: default fsreplica is '%s'" % (lineno, self.default_fsreplica))
fsreplica = self.default_fsreplica
else:
processed = True
fsreplica = -1
try:
fsreplica = int(repstr)
if fsreplica < 0:
raise ValueError()
except (ValueError), e:
raise GPError('Invalid filespace replica num: %s' %repstr)
if processed:
continue
if dfs_url is None:
dfs_url = parse_dfs_url(self.options.config, lineno, line)
if not fspacename:
raise GPError("Filespace name not specified")
if not fsysname:
raise GPError("Filespace type not specified")
if not dfs_url:
raise GPError("Filespace dfs_url not specified")
# Everything checks out, generate the SQL and execute it
q = "\nCREATE FILESPACE %s ON %s \n('%s/%s')" % (fspacename, fsysname, dfs_url, fspacename)
if fsreplica is not None and fsreplica > 0:
q += " WITH (NUMREPLICA = %d);" % (fsreplica)
logger.debug(q);
print q
logger.info("Connecting to database")
self.connect()
try:
cursor = dbconn.execSQL(self.db, q)
self.db.commit() # Should move into execSQL ?
except Exception, e:
raise GPDatabaseError(str(e))
logger.info('Filespace "%s" successfully created' % fspacename)
sys.stdout.flush()
# ============================================================================
# main()
# ============================================================================
logger = None
if __name__ == '__main__':
opt = ParseOptions()
if opt.verbose:
enable_verbose_logging()
if opt.quiet:
quiet_stdout_logging()
logger = setup_tool_logging(EXECNAME, opt.host, opt.username, opt.logdir)
if opt.password:
opt.password = getpass('Password: ')
else:
opt.password = None
try:
if opt.movefilespace:
logger.debug("Start filespace move")
logger.info("""
A tablespace requires a file system location to store its database
files. A filespace is a collection of file system locations for all components
in a HAWQ system.
Once a filespace is created, it can be used by one or more tablespaces.
""")
if opt.movefilespace == 'default':
opt.movefilespace = "dfs_system"
if opt.location is None or len(opt.location) == 0:
usage()
logger.error("Target location is not set");
sys.exit(1)
logger.warn('PLEASE STOP DATABASE AND BACKUP MASTER DATA DIRECTORY IN CASE OF LOSING DATA!')
question = "I understand the risk and already backuped master data directory? (Y|N)"
if not getbool("> ", question, "N"):
raise KeyboardInterrupt()
MoveFileSpaceLocation(opt.movefilespace, opt.location, opt.username, opt.password).run()
logger.info("Move filespace successfully.")
logger.info("IF STANDBY MASTER IS CONFIGURED, PLEASE REMOVE IT AND INITIALIZE A NEW ONE!")
else:
gpf = GPFilespace(opt)
try:
gpf.Setup()
if opt.config == None:
gpf.generate_config()
else:
gpf.execute_config()
finally:
gpf.cleanup()
except MoveFilespaceError, e:
logger.error(e)
sys.exit(1)
# An expected error
except GPError, e:
try:
# Cast to unicode using UTF-8 to work around issues with the
# logging module:
# logger.error("퀙퀠퀩퀘ㄲㄸㅉㅃ12ㅁavb") => UnicodeDecodeError
err = unicode(str(e), 'UTF-8')
logger.error(err)
logger.debug("exit(1)")
except BaseException, e:
sys.stderr.write("[Error] %s\n" % str(e))
sys.stderr.write(traceback.format_exc())
sys.stderr.flush()
sys.exit(1)
# User cancelation
except KeyboardInterrupt, e:
try:
logger.info('[Cancelled]')
except BaseException, e:
sys.stderr.write("error %s\n" % str(e))
sys.exit(1)
# Let SystemExit exceptions through
except SystemExit, e:
try:
logger.debug('exit(%s)' % str(e))
except BaseException, e:
sys.stderr.write("error %s\n" % str(e))
raise e
# Catch anything else - shouldn't ever occur
except BaseException, e:
try:
err = '[%s]: "%s"' % (type(e).__name__, unicode(str(e), 'UTF-8'))
logger.error(err)
logger.debug(traceback.format_exc())
logger.debug("exit(1)")
sys.stderr.write('[Error] See "%s" for details\n' % get_logfile())
except BaseException, e:
sys.stderr.write("error %s\n" % str(e))
sys.exit(1)