blob: 99412496464dc0daa8d73d6c78bbe85c0fb3cf2d [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
hawqarray.py:
Contains three classes representing configuration information of a
Greenplum array:
HAWQArray - The primary interface - collection of all HAWQDB within an array
HAWQDB - represents configuration information for a single registration_order
Segment - collection of all HAWQDB with the same registration_order
"""
# ============================================================================
from datetime import date
import copy
import traceback
from gppylib.utils import checkNotNone, checkIsInt
from gppylib import gplog
from gppylib.db import dbconn
from gppylib.gpversion import GpVersion
from gppylib.commands.unix import *
from hawqpylib.hawqlib import HawqXMLParser
SYSTEM_FILESPACE = 3052 # oid of the system filespace
logger = gplog.get_default_logger()
DESTINATION_FILE_SPACES_DIRECTORY = "fs_directory"
ROLE_MASTER = 'm'
ROLE_STANDBY = 's'
ROLE_PRIMARY = 'p'
VALID_ROLES = [ROLE_MASTER, ROLE_STANDBY, ROLE_PRIMARY]
STATUS_UP = 'u'
STATUS_DOWN = 'd'
VALID_STATUS = [STATUS_UP, STATUS_DOWN]
# SegmentState values returned from gp_primarymirror.
SEGMENT_STATE_NOT_INITIALIZED = "NotInitialized"
SEGMENT_STATE_INITIALIZATION = "Initialization"
SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION = "InChangeTrackingTransition"
SEGMENT_STATE_IN_RESYNCTRANSITION = "InResyncTransition"
SEGMENT_STATE_IN_SYNC_TRANSITION = "InSyncTransition"
SEGMENT_STATE_READY = "Ready"
SEGMENT_STATE_CHANGE_TRACKING_DISABLED = "ChangeTrackingDisabled"
SEGMENT_STATE_FAULT = "Fault"
SEGMENT_STATE_SHUTDOWN_BACKENDS = "ShutdownBackends"
SEGMENT_STATE_SHUTDOWN = "Shutdown"
SEGMENT_STATE_IMMEDIATE_SHUTDOWN = "ImmediateShutdown"
MASTER_REGISTRATION_ORDER = 0
class InvalidSegmentConfiguration(Exception):
"""Exception raised when an invalid hawqarray configuration is
read from gp_segment_configuration or an attempt to save an
invalid hawqarray configuration is made."""
def __init__(self, array):
self.array = array
def __str__(self):
return "Invalid HAWQArray: %s" % self.array
# ============================================================================
# ============================================================================
class HAWQDB:
"""
HAWQDB class representing configuration information for a single
registration_order within a HAWQ cluster.
"""
# --------------------------------------------------------------------
def __init__(self, registration_order, role, status,
hostname, address, port, datadir):
self.registration_order = registration_order
self.role = role
self.status = status
self.hostname = hostname
self.address = address
self.port = port
self.datadir = datadir
self.catdir = datadir
# Filespace mappings for a HAWQ DB
self.filespaces = None
# Pending filespace creation
self.pending_filespace = None
# Check if the status is 'u' up, 'd' for down
self.valid = (status == 'u')
# --------------------------------------------------------------------
def __str__(self):
"""
Construct a printable string representation of a HAWQDB
"""
return "%s:%s:registration_order=%s:status=%s" % (
self.hostname,
self.datadir,
self.registration_order,
self.status
)
#
# Note that this is not an ideal comparison -- it uses the string representation
# for comparison
#
def __cmp__(self,other):
left = repr(self)
right = repr(other)
if left < right: return -1
elif left > right: return 1
else: return 0
def equalIgnoringStatus(self, other):
"""
Return true if none of the "core" attributes (e.g. filespace)
of two segments differ, false otherwise.
This method is used by updateSystemConfig() to know when a catalog
change will cause removing and re-adding a mirror segment.
"""
firstStatus = self.getStatus()
try:
# make the elements we don't want to compare match and see if they are then equal
self.setStatus(other.getStatus())
return self == other
finally:
# restore mode and status after comaprison
self.setStatus(firstStatus)
# --------------------------------------------------------------------
@staticmethod
def getDataDirPrefix(datadir):
retValue = ""
retValue = datadir[:datadir.rfind('/')]
return retValue
# --------------------------------------------------------------------
@staticmethod
def getFileSpaceDirsWithNewSuffix(fileSpaceDictionary, suffix, includeSystemFilespace = True):
"""
This method will take the a dictionary of file spaces and return the same dictionary with the new sufix.
"""
retValue = {}
for entry in fileSpaceDictionary:
if entry == SYSTEM_FILESPACE and includeSystemFilespace == False:
continue
newDir = HAWQDB.getDataDirPrefix(fileSpaceDictionary[entry])
newDir = newDir + "/" + suffix
retValue[entry] = newDir
return retValue
# --------------------------------------------------------------------
def copy(self):
"""
Creates a copy of the segment, shallow for everything except the filespaces map
"""
res = copy.copy(self)
res.filespaces = copy.copy(self.filespaces)
return res
# --------------------------------------------------------------------
# Six simple helper functions to identify what role a segment plays:
# + QD (Query Dispatcher)
# + master
# + standby master
# + QE (Query Executor)
# + primary
# --------------------------------------------------------------------
def isMaster(self):
return self.role == ROLE_MASTER
def isStandby(self):
return self.role == ROLE_STANDBY
def isSegment(self):
return self.role == ROLE_PRIMARY
def isUp(self):
return self.status == STATUS_UP
def isDown(self):
return self.status == STATUS_DOWN
# --------------------------------------------------------------------
# getters
# --------------------------------------------------------------------
def getRegistrationOrder(self):
return checkNotNone("registration_order", self.registration_order)
def getRole(self):
return checkNotNone("role", self.role)
def getStatus(self):
return checkNotNone("status", self.status)
def getPort(self):
"""
Returns the listening port for the postmaster for this segment.
Note: With file replication the postmaster will not be active for
mirrors so nothing will be listening on this port, instead the
"replicationPort" is used for primary-mirror communication.
"""
return checkNotNone("port", self.port)
def getHostName(self):
"""
Returns the actual `hostname` for the host
Note: use getSegmentAddress for the network address to use
"""
return self.hostname
def getAddress(self):
"""
Returns the network address to use to contact the segment (i.e. the NIC address).
"""
return self.address
def getDataDirectory(self):
"""
Return the primary datadirectory location for the segment.
Note: the datadirectory is just one of the filespace locations
associated with the segment, calling code should be carefull not
to assume that this is the only directory location for this segment.
Todo: evaluate callers of this function to see if they should really
be dealing with a list of filespaces.
"""
return checkNotNone("dataDirectory", self.datadir)
def getFilespaces(self):
"""
Returns the filespace dictionary of oid->path pairs
"""
return self.filespaces
# --------------------------------------------------------------------
# setters
# --------------------------------------------------------------------
def setRegistrationOrder(self, registration_order):
checkNotNone("registration_order", registration_order)
checkIsInt("registration_order", registration_order)
self.registration_order = registration_order
def setRole(self, role):
checkNotNone("role", role)
if role not in VALID_ROLES:
raise Exception("Invalid role '%s'" % role)
self.role = role
def setStatus(self, status):
checkNotNone("status", status)
if status not in VALID_STATUS:
raise Exception("Invalid status '%s'" % status)
self.status = status
def setPort(self, port):
checkNotNone("port", port)
checkIsInt("port", port)
self.port = port
def setHostName(self, hostName):
# None is allowed -- don't check
self.hostname = hostName
def setAddress(self, address):
# None is allowed -- don't check
self.address = address
def setDataDirectory(self, dataDirectory):
checkNotNone("dataDirectory", dataDirectory)
self.datadir = dataDirectory
def addFilespace(self, oid, path):
"""
Add a filespace path for this segment.
Throws:
Exception - if a path has already been specified for this segment.
"""
# gpfilespace adds a special filespace with oid=None to indicate
# the filespace that it is currently building, since the filespace
# does not yet exist there is no valid value that could be used.
if oid == None:
if self.pending_filespace:
raise Exception("Duplicate filespace path for registration_order %d" %
self.registration_order)
self.pending_filespace = path
return
# oids should always be integer values > 0
oid = int(oid)
assert(oid > 0)
# The more usual case just sets the filespace in the filespace
# dictionary
if oid in self.filespaces:
raise Exception("Duplicate filespace path for "
"registration_order %d filespace %d" % (self.registration_order, oid))
self.filespaces[oid] = path
def getPendingFilespace(self):
"""
Returns the pending filespace location for this segment
(called by gpfilespace)
"""
return self.pending_filespace
class HAWQFilesystemObj:
"""
List information for a filesystem, as stored in pg_filesystem
"""
def __init__(self, oid, name, shared):
self.__oid = oid
self.__name = name
self.__shared = shared
def getOid(self):
return self.__oid
def getName(self):
return self.__name
def isShared(self):
return self.__shared == True
@staticmethod
def getFilesystemObj(filesystemArr, fsoid):
# local storage
if fsoid == 0:
return None
# plugin storage
for fsys in filesystemArr:
if (fsys.getOid() == fsoid):
return fsys
raise Exception("Error: invalid file system oid %d" % (fsoid))
class HAWQFilespaceObj:
"""
List information for a filespace, as stored in pg_filespace
"""
def __init__(self, oid, name, fsys):
self.__oid = oid
self.__name = name
self.__fsys = fsys
def getOid(self):
return self.__oid
def getName(self):
return self.__name
def getFsys(self):
return self.__fsys
def isSystemFilespace(self):
return self.__oid == SYSTEM_FILESPACE
class HAWQArray:
"""
HAWQArray is a python class that describes a HAWQ array.
A HAWQ array consists of:
master - The primary QD for the array
standby master - The mirror QD for the array [optional]
segment array - an array of segments within the cluster
Each segment is either a single HAWQDB object, or a primary/mirror pair.
It can be initialized either from a database connection, in which case
it discovers the configuration information by examining the catalog, or
via a configuration file.
"""
# --------------------------------------------------------------------
def __init__(self, hawqdbs):
"""
segmentsInDb is used only be the configurationImpl* providers; it is used to track the state of the
segments in the database
TODO:
"""
self.master = None
self.standbyMaster = None
self.segments = []
self.numSegments = 0
self.version = None
self.setFilespaces([])
for hdb in hawqdbs:
# Handle master
if hdb.isMaster():
if self.master != None:
logger.error("multiple master dbs defined")
raise Exception("HAWQArray - multiple master dbs defined")
self.master = hdb
# Handle standby
elif hdb.isStandby():
if self.standbyMaster != None:
logger.error("multiple standby master dbs defined")
raise Exception("HAWQArray - multiple standby master dbs defined")
self.standbyMaster = hdb
# Handle segments
elif hdb.isSegment():
self.addSegment(hdb)
else:
# Not a master, standbymaster, primary, or mirror?
# shouldn't even be possible.
logger.error("FATAL - invalid dbs defined")
raise Exception("Error: HAWQArray() - invalid dbs defined")
# Make sure HAWQ cluster has a master
if self.master is None:
logger.error("FATAL - no master defined!")
raise Exception("Error: HAWQArray() - no master defined")
def __str__(self):
return "Master: %s\nStandby: %s\nSegments: %s" % (str(self.master),
str(self.standbyMaster) if self.standbyMaster else 'Not Configured',
"\n".join([str(seg) for seg in self.segments]))
def addSegment(self, hdb):
if hdb.isSegment():
self.segments.append(hdb)
self.numSegments += 1
else:
raise Exception("Error: adding invalid segment to HAWQArray")
# --------------------------------------------------------------------
@staticmethod
def initFromCatalog(dbURL, utility=False, useAllSegmentFileSpaces=False):
"""
Factory method, initializes a HAWQArray from provided database URL
Please note that -
useAllSegmentFilespaces when set to true makes this method add *all* filespaces
to the segments of hawqarray. If false, only returns Master/Standby all filespaces
This is *hacky* and we know that it is not the right way to design methods/interfaces
We are doing this so that we do not affect behavior of existing tools like upgrade, gprecoverseg etc
"""
conn = dbconn.connect(dbURL, utility)
# Get the version from the database:
version_str = None
for row in dbconn.execSQL(conn, "SELECT version()"):
version_str = row[0]
version = GpVersion(version_str)
# Now only support HAWQ 2.x
hawq_major_version = version.getVersionRelease().split('.')[0]
if hawq_major_version == '2':
hawq_site = HawqXMLParser(GPHOME)
master_data_directory = hawq_site.get_value_from_name('hawq_master_directory')
segment_data_directory = hawq_site.get_value_from_name('hawq_segment_directory')
config_rows = dbconn.execSQL(conn, '''
SELECT sc.registration_order,
sc.role,
sc.status,
sc.hostname,
sc.address,
sc.port,
CASE
WHEN sc.registration_order <= 0 THEN '%s'
ELSE '%s'
END AS datadir
FROM pg_catalog.gp_segment_configuration sc
ORDER BY sc.registration_order;''' %
(master_data_directory, segment_data_directory))
# All of filesystem is shared storage
filesystemRows = dbconn.execSQL(conn, '''
SELECT oid, fsysname, true AS fsysshared
FROM pg_filesystem
ORDER BY fsysname
''')
filesystemArr = [HAWQFilesystemObj(fsysRow[0], fsysRow[1], fsysRow[2]) for fsysRow in filesystemRows]
filespaceRows = dbconn.execSQL(conn, '''
SELECT oid, fsname, fsfsys AS fsoid
FROM pg_filespace
WHERE oid != %d
ORDER BY fsname;
''' % (SYSTEM_FILESPACE))
filespaceArr = [HAWQFilespaceObj(fsRow[0], fsRow[1], HAWQFilesystemObj.getFilesystemObj(filesystemArr, fsRow[2])) for fsRow in filespaceRows]
else:
raise Exception("HAWQ version is invalid: %s" % version)
hawqdbs = []
print "### initFromCatalog ###"
hdb = None
for row in config_rows:
print row
# Extract fields from the row
(registration_order, role, status, hostname,
address, port, datadir) = row
# In GPSQL, only master maintain the filespace information.
# if registration_order != MASTER_REGISTRATION_ORDER and \
# fsoid != SYSTEM_FILESPACE and \
# not useAllSegmentFileSpaces:
# print "### initFromCatalog ... continue ###"
# continue
# The query returns all the filespaces for a segment on separate
# rows. If this row is the same dbid as the previous row simply
# add this filespace to the existing list, otherwise create a
# new segment.
# if seg and seg.getSegmentRegistrationOrder() == registration_order:
# seg.addSegmentFilespace(fsoid, fslocation)
# else:
# seg = HAWQDB(registration_order, role, status,
# hostname, address, port, datadir)
# segments.append(seg)
hdb = HAWQDB(registration_order, role, status,
hostname, address, port, datadir)
print "### initFromCatalog ... hdb ###"
print hdb
hawqdbs.append(hdb)
print "### initFromCatalog ... hawqdbs ###"
print hawqdbs
conn.close()
# origSegments = [seg.copy() for seg in segments]
array = HAWQArray(hawqdbs)
array.version = version
array.setFilespaces(filespaceArr)
array.setFilesystem(filesystemArr)
return array
# --------------------------------------------------------------------
def is_array_valid(self):
"""Checks that each array is in a valid state"""
if self.master.getStatus() != STATUS_UP:
return False
if self.standbyMaster and self.standbyMaster.getStatus() != STATUS_UP:
return False
for seg in self.segments:
if not seg.status == STATUS_UP:
return False
return True
# --------------------------------------------------------------------
def setFilesystem(self, filesystemArr):
"""
@param filesystemArr of GpFilesystemObj objects
"""
self.filesystemArr = [fsys for fsys in filesystemArr]
def getFilesystem(self):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filesystem name
"""
return [fsys for fsys in self.filesystemArr]
def setFilespaces(self, filespaceArr):
"""
@param filespaceArr of GpFilespaceObj objects
"""
self.filespaceArr = [fs for fs in filespaceArr]
def getFilespaces(self, includeSystemFilespace=True):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
"""
return [fs for fs in self.filespaceArr if fs.isSystemFilespace()]
def getNonSystemFilespaces(self):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
"""
return [fs for fs in self.filespaceArr if not fs.isSystemFilespace()]
def getAllFilespaces(self):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
"""
return [fs for fs in self.filespaceArr]
# --------------------------------------------------------------
def getFileSpaceName(self, filespaceOid):
retValue = None
if self.filespaceArr != None:
for entry in self.filespaceArr:
if entry.getOid() == filespaceOid:
retValue = entry.getName()
break
return retValue
# --------------------------------------------------------------
def getFileSpaceOid(self, filespaceName):
retValue = None
if self.filespaceArr != None:
for entry in self.filespaceArr:
if entry.getName() == filespaceName:
retValue = entry.getOid()
break
return retValue
# --------------------------------------------------------------
def isFileSpaceShared(self, filespaceOid):
retValue = False
if self.filespaceArr != None:
for entry in self.filespaceArr:
if entry.getOid() == filespaceOid:
retValue = entry.getFsys() != None and entry.getFsys().isShared()
break
return retValue
# --------------------------------------------------------------------
def getDbList(self):
"""
Return a list of all HAWQDB objects that make up the array
"""
dbs=[]
dbs.append(self.master)
if self.standbyMaster:
dbs.append(self.standbyMaster)
dbs.extend(self.getSegDbList())
return dbs
# --------------------------------------------------------------------
def getHostList(self, includeExpansionSegs = False):
"""
Return a list of all Hosts that make up the array
"""
hostList = []
hostList.append(self.master.getSegmentHostName())
if self.standbyMaster:
hostList.append(self.standbyMaster.getSegmentHostName())
dbList = self.getDbList()
for db in dbList:
if db.getSegmentHostName() in hostList:
continue
else:
hostList.append(db.getSegmentHostName())
return hostList
def getSegDbList(self):
"""Return a list of all HAWQDB objects for all segments in the array"""
dbs=[]
for seg in self.segments:
dbs.append(seg)
return dbs