blob: 262de36661823d1c817259292154cceeb22628c5 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
gparray.py:
Contains three classes representing configuration information of a
Greenplum array:
GpArray - The primary interface - collection of all GpDB within an array
GpDB - represents configuration information for a single dbid
Segment - collection of all GpDB with the same content id
"""
# ============================================================================
from datetime import date
import copy
import traceback
from gppylib.utils import checkNotNone, checkIsInt
from gppylib import gplog
from gppylib.db import dbconn
from gppylib.gpversion import GpVersion
from gppylib.commands.unix import *
SYSTEM_FILESPACE = 3052 # oid of the system filespace
logger = gplog.get_default_logger()
DESTINATION_FILE_SPACES_DIRECTORY = "fs_directory"
ROLE_PRIMARY = 'p'
ROLE_MIRROR = 'm'
VALID_ROLES = [ROLE_PRIMARY, ROLE_MIRROR]
# Map gp_segment_configuration role values to values from gp_primarymirror.
ROLE_TO_MODE_MAP = {}
SEG_MODE_PRIMARY = "PrimarySegment"
SEG_MODE_MIRROR = "MirrorSegment"
ROLE_TO_MODE_MAP[ROLE_PRIMARY] = SEG_MODE_PRIMARY
ROLE_TO_MODE_MAP[ROLE_MIRROR] = SEG_MODE_MIRROR
STATUS_UP = 'u'
STATUS_DOWN = 'd'
VALID_STATUS = [STATUS_UP, STATUS_DOWN]
MODE_NOT_INITIALIZED = '' # no mirroring
MODE_CHANGELOGGING = 'c' # filerep logging
MODE_SYNCHRONIZED = 's' # filerep synchronized
MODE_RESYNCHRONIZATION = 'r' #
# Map gp_segment_configuration mode values to values retured from gp_primarymirror.
MODE_TO_DATA_STATE_MAP = {}
SEG_DATA_STATE_NOT_INITIALIZED = "NotInitialized"
SEG_DATA_STATE_IN_CHANGE_TRACKING = "InChangeTracking"
SEG_DATA_STATE_SYNCHRONIZED = "InSync"
SEG_DATA_STATE_IN_RESYNC = "InResync"
MODE_TO_DATA_STATE_MAP[MODE_NOT_INITIALIZED] = SEG_DATA_STATE_NOT_INITIALIZED
MODE_TO_DATA_STATE_MAP[MODE_CHANGELOGGING] = SEG_DATA_STATE_IN_CHANGE_TRACKING
MODE_TO_DATA_STATE_MAP[MODE_SYNCHRONIZED] = SEG_DATA_STATE_SYNCHRONIZED
MODE_TO_DATA_STATE_MAP[MODE_RESYNCHRONIZATION] = SEG_DATA_STATE_IN_RESYNC
# SegmentState values returned from gp_primarymirror.
SEGMENT_STATE_NOT_INITIALIZED = "NotInitialized"
SEGMENT_STATE_INITIALIZATION = "Initialization"
SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION = "InChangeTrackingTransition"
SEGMENT_STATE_IN_RESYNCTRANSITION = "InResyncTransition"
SEGMENT_STATE_IN_SYNC_TRANSITION = "InSyncTransition"
SEGMENT_STATE_READY = "Ready"
SEGMENT_STATE_CHANGE_TRACKING_DISABLED = "ChangeTrackingDisabled"
SEGMENT_STATE_FAULT = "Fault"
SEGMENT_STATE_SHUTDOWN_BACKENDS = "ShutdownBackends"
SEGMENT_STATE_SHUTDOWN = "Shutdown"
SEGMENT_STATE_IMMEDIATE_SHUTDOWN = "ImmediateShutdown"
VALID_MODE = [
MODE_SYNCHRONIZED,
MODE_CHANGELOGGING,
MODE_RESYNCHRONIZATION,
]
MODE_LABELS = {
MODE_CHANGELOGGING: "Change Tracking",
MODE_SYNCHRONIZED: "Synchronized",
MODE_RESYNCHRONIZATION: "Resynchronizing"
}
# These are all the valid states primary/mirror pairs can
# be in. Any configuration other than this will cause the
# FTS Prober to bring down the master postmaster until the
# configuration is corrected. Here, primary and mirror refer
# to the segments current role, not the preferred_role.
#
# The format of the tuples are:
# (<primary status>, <prmary mode>, <mirror status>, <mirror_mode>)
VALID_SEGMENT_STATES = [
(STATUS_UP, MODE_CHANGELOGGING, STATUS_DOWN, MODE_SYNCHRONIZED),
(STATUS_UP, MODE_CHANGELOGGING, STATUS_DOWN, MODE_RESYNCHRONIZATION),
(STATUS_UP, MODE_RESYNCHRONIZATION, STATUS_UP, MODE_RESYNCHRONIZATION),
(STATUS_UP, MODE_SYNCHRONIZED, STATUS_UP, MODE_SYNCHRONIZED)
]
def getDataModeLabel(mode):
return MODE_LABELS[mode]
FAULT_STRATEGY_NONE = 'n' # mirrorless systems
FAULT_STRATEGY_FILE_REPLICATION = 'f' # valid for versions 4.0+
FAULT_STRATEGY_SAN = 's' # valid for versions 4.0+
FAULT_STRATEGY_READONLY = 'r' # valid for versions 3.x
FAULT_STRATEGY_CONTINUE = 'c' # valid for versions 3.x
FAULT_STRATEGY_LABELS = {
FAULT_STRATEGY_NONE: "none",
FAULT_STRATEGY_FILE_REPLICATION: "physical mirroring",
FAULT_STRATEGY_SAN: "SAN failover",
FAULT_STRATEGY_READONLY: "readonly",
FAULT_STRATEGY_CONTINUE: "continue",
}
VALID_FAULT_STRATEGY = FAULT_STRATEGY_LABELS.keys()
MASTER_CONTENT_ID = -1
def getFaultStrategyLabel(strategy):
return FAULT_STRATEGY_LABELS[strategy]
class InvalidSegmentConfiguration(Exception):
"""Exception raised when an invalid gparray configuration is
read from gp_segment_configuration or an attempt to save an
invalid gparray configuration is made."""
def __init__(self, array):
self.array = array
def __str__(self):
return "Invalid GpArray: %s" % self.array
# ============================================================================
# ============================================================================
class GpDB:
"""
GpDB class representing configuration information for a single dbid
within a Greenplum Array.
"""
# --------------------------------------------------------------------
def __init__(self, content, preferred_role, dbid, role, mode, status,
hostname, address, port, datadir, replicationPort):
# Todo: replace all these fields with private alternatives:
# e.g. '_content' instead of 'content'.
#
# Other code should go through class interfaces for access, this
# will allow easier modifications in the future.
self.content=content
self.preferred_role=preferred_role
self.dbid=dbid
self.role=role
self.mode=mode
self.status=status
self.hostname=hostname
self.address=address
self.port=port
self.datadir=datadir
self.replicationPort=replicationPort
# Filespace mappings for this segment
# Todo: Handle self.datadir more cleanly
# Todo: Handle initialization more cleanly
self.__filespaces = { SYSTEM_FILESPACE: datadir }
# Pending filespace creation
self.__pending_filespace = None
# Catalog directory for each database in this segment
self.catdirs = None
# Todo: Remove old dead code
# GPSQL: The preferred primary can be down status, but try to start!
self.valid = (preferred_role == 'p' and role == 'p') or (status == 'u')
# --------------------------------------------------------------------
def __str__(self):
"""
Construct a printable string representation of a GpDB
"""
return "%s:%s:content=%s:dbid=%s:mode=%s:status=%s" % (
self.hostname,
self.datadir,
self.content,
self.dbid,
self.mode,
self.status
)
#
# Note that this is not an ideal comparison -- it uses the string representation
# for comparison
#
def __cmp__(self,other):
left = repr(self)
right = repr(other)
if left < right: return -1
elif left > right: return 1
else: return 0
#
# Moved here from system/configurationImplGpdb.py
#
def equalIgnoringModeAndStatusAndReplicationPort(self, other):
"""
Return true if none of the "core" attributes (e.g. filespace)
of two segments differ, false otherwise.
This method is used by updateSystemConfig() to know when a catalog
change will cause removing and re-adding a mirror segment.
"""
firstMode = self.getSegmentMode()
firstStatus = self.getSegmentStatus()
firstReplicationPort = self.getSegmentReplicationPort()
try:
# make the elements we don't want to compare match and see if they are then equal
self.setSegmentMode(other.getSegmentMode())
self.setSegmentStatus(other.getSegmentStatus())
self.setSegmentReplicationPort(other.getSegmentReplicationPort())
return self == other
finally:
#
# restore mode and status after comaprison
#
self.setSegmentMode(firstMode)
self.setSegmentStatus(firstStatus)
self.setSegmentReplicationPort(firstReplicationPort)
# --------------------------------------------------------------------
def __repr__(self):
"""
Construct a string representation of class, must be sufficient
information to call initFromString on the result and deterministic
so it can be used for __cmp__ comparison
"""
# Note: this doesn't currently handle "pending filespaces", but
# this is not currently required since gpfilespace is the only code
# that generates pending filespaces and it never serializes a gparray
# object.
fsOids = [oid for oid in self.__filespaces]
fsOids.sort() # sort for determinism
filespaces = []
for fsoid in fsOids:
if fsoid not in [SYSTEM_FILESPACE]:
filespaces.append("%d+%s" % (fsoid, self.__filespaces[fsoid]))
return '%d|%d|%s|%s|%s|%s|%s|%s|%d|%s|%s|%s|%s' % (
self.dbid,
self.content,
self.role,
self.preferred_role,
self.mode,
self.status,
self.hostname,
self.address,
self.port,
self.replicationPort,
self.datadir,
','.join(filespaces), # this is rather ugly
','.join(self.catdirs) if self.catdirs else []
)
# --------------------------------------------------------------------
@staticmethod
def initFromString(s):
"""
Factory method, initializes a GpDB object from string representation.
- Used when importing from file format.
- TODO: Should be compatable with repr() formatting.
"""
tup = s.strip().split('|')
# Old format: 8 fields
# Todo: remove the need for this, or rework it to be cleaner
if len(tup) == 8:
# This describes the gp_configuration catalog (pre 3.4)
content = int(tup[0])
definedprimary = tup[1]
dbid = int(tup[2])
isprimary = tup[3]
valid = tup[4]
address = tup[5]
port = int(tup[6])
datadir = tup[7]
# Calculate new fields from old ones
#
# Note: this should be kept in sync with the code in
# GpArray.InitFromCatalog() code for initializing old catalog
# formats.
preferred_role = ROLE_PRIMARY if definedprimary else ROLE_MIRROR
role = ROLE_PRIMARY if isprimary else ROLE_MIRROR
hostname = None
mode = MODE_SYNCHRONIZED # ???
status = STATUS_UP if valid else STATUS_DOWN
replicationPort = None
filespaces = ""
catdirs = ""
# Catalog 3.4 format: 12 fields
elif len(tup) == 12:
# This describes the gp_segment_configuration catalog (3.4)
dbid = int(tup[0])
content = int(tup[1])
role = tup[2]
preferred_role = tup[3]
mode = tup[4]
status = tup[5]
hostname = tup[6]
address = tup[7]
port = int(tup[8])
replicationPort = tup[9]
datadir = tup[10] # from the pg_filespace_entry table
filespaces = tup[11]
catdirs = ""
# Catalog 4.0+: 13 fields
elif len(tup) == 13:
# This describes the gp_segment_configuration catalog (3.4+)
dbid = int(tup[0])
content = int(tup[1])
role = tup[2]
preferred_role = tup[3]
mode = tup[4]
status = tup[5]
hostname = tup[6]
address = tup[7]
port = int(tup[8])
replicationPort = tup[9]
datadir = tup[10] # from the pg_filespace_entry table
filespaces = tup[11]
catdirs = tup[12]
else:
raise Exception("GpDB unknown input format: %s" % s)
# Initialize segment without filespace information
gpdb = GpDB(content = content,
preferred_role = preferred_role,
dbid = dbid,
role = role,
mode = mode,
status = status,
hostname = hostname,
address = address,
port = port,
datadir = datadir,
replicationPort = replicationPort)
# Add in filespace information, if present
for fs in filespaces.split(","):
if fs == "":
continue
(fsoid, fselocation) = fs.split("+")
gpdb.addSegmentFilespace(fsoid, fselocation)
# Add Catalog Dir, if present
gpdb.catdirs = []
for d in catdirs.split(","):
if d == "":
continue
gpdb.catdirs.append(d)
# Return the completed segment
return gpdb
# --------------------------------------------------------------------
@staticmethod
def getDataDirPrefix(datadir):
retValue = ""
retValue = datadir[:datadir.rfind('/')]
return retValue
# --------------------------------------------------------------------
@staticmethod
def getFileSpaceDirsWithNewSuffix(fileSpaceDictionary, suffix, includeSystemFilespace = True):
"""
This method will take the a dictionary of file spaces and return the same dictionary with the new sufix.
"""
retValue = {}
for entry in fileSpaceDictionary:
if entry == SYSTEM_FILESPACE and includeSystemFilespace == False:
continue
newDir = GpDB.getDataDirPrefix(fileSpaceDictionary[entry])
newDir = newDir + "/" + suffix
retValue[entry] = newDir
return retValue
# --------------------------------------------------------------------
@staticmethod
def replaceFileSpaceContentID(fileSpaceDictionary, oldContent, newContent):
retValue = {}
for entry in fileSpaceDictionary:
tempDir = fileSpaceDictionary[entry]
tempDir = tempDir[:tempDir.rfind(str(oldContent))]
tempDir += ('%d' % newContent)
retValue[entry] = tempDir
return retValue
# --------------------------------------------------------------------
def copy(self):
"""
Creates a copy of the segment, shallow for everything except the filespaces map
"""
res = copy.copy(self)
res.__filespaces = copy.copy(self.__filespaces)
return res
# --------------------------------------------------------------------
def createTemplate(self, dstDir):
"""
Create a tempate given the information in this GpDB.
"""
# Make sure we have enough room in the dstDir to fit the segment and its filespaces.
requiredSize = DiskUsage.get_size( name = "srcDir"
, remote_host = self.address
, directory = dstDir
)
name = "segcopy filespace get_size"
for oid in self.__filespaces:
if oid == SYSTEM_FILESPACE:
continue
dir = self.__filespaces[oid]
size = DiskUsage.get_size(name, self.address, dir)
requiredSize = requiredSize + size
dstBytesAvail = DiskFree.get_size_local(name = "Check for available free space for segment template", directory = dstDir)
if dstBytesAvail <= requiredSize:
raise Exception("Not enough space on directory: '%s'. Currently %d bytes free but need %d bytes." % (dstDir, int(dstBytesAvail), int(requiredSize)))
logger.info("Starting copy of segment dbid %d to location %s" % (int(self.getSegmentDbId()), dstDir))
# scp does not support fifo files, so tar it first, then copy it, and finally untar it.
templateTarFileDir = '/tmp'
templateTarFileName = 'hawq_template' + time.strftime("%Y%m%d_%H%M%S")
dstTarFile = templateTarFileDir + '/' + templateTarFileName
# tar runs on the remote template segment.
tarCmd = CreateTar("Tar data direcotry", self.getSegmentDataDirectory(), dstTarFile, REMOTE, self.address, 'pgsql_tmp')
tarCmd.run(validateAfter = True)
res = tarCmd.get_results()
# scp tar file to the master
cpCmd = Scp("Copy system data directory tar", dstTarFile, dstDir, self.address, None, True)
cpCmd.run(validateAfter = True)
res = cpCmd.get_results()
untarCmd = ExtractTar("Untar data directory", dstDir + '/' + templateTarFileName, dstDir)
untarCmd.run(validateAfter = True)
res = untarCmd.get_results()
rmCmd = RemoveFiles('Remove data directory tar', dstDir + '/' + templateTarFileName)
rmCmd.run(validateAfter = True)
res = rmCmd.get_results()
# We need 700 permissions or postgres won't start
Chmod.local('set template permissions', dstDir, '0700')
# --------------------------------------------------------------------
# Six simple helper functions to identify what role a segment plays:
# + QD (Query Dispatcher)
# + master
# + standby master
# + QE (Query Executor)
# + primary
# + mirror
# --------------------------------------------------------------------
def isSegmentQD(self):
return self.content < 0
def isSegmentMaster(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content < 0 and role == ROLE_PRIMARY
def isSegmentStandby(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content < 0 and role == ROLE_MIRROR
def isSegmentQE(self):
return self.content >= 0
def isSegmentPrimary(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content >= 0 and role == ROLE_PRIMARY
def isSegmentMirror(self, current_role=False):
role = self.role if current_role else self.preferred_role
return self.content >= 0 and role == ROLE_MIRROR
def isSegmentUp(self):
return self.status == STATUS_UP
def isSegmentDown(self):
return self.status == STATUS_DOWN
def isSegmentModeInChangeLogging(self):
return self.mode == MODE_CHANGELOGGING
def isSegmentModeSynchronized(self):
return self.mode == MODE_SYNCHRONIZED
def isSegmentModeInResynchronization(self):
return self.mode == MODE_RESYNCHRONIZATION
# --------------------------------------------------------------------
# getters
# --------------------------------------------------------------------
def getSegmentDbId(self):
return checkNotNone("dbId", self.dbid)
def getSegmentContentId(self):
return checkNotNone("contentId", self.content)
def getSegmentRole(self):
return checkNotNone("role", self.role)
def getSegmentPreferredRole(self):
return checkNotNone("preferredRole", self.preferred_role)
def getSegmentMode(self):
return checkNotNone("mode", self.mode)
def getSegmentStatus(self):
return checkNotNone("status", self.status)
def getSegmentPort(self):
"""
Returns the listening port for the postmaster for this segment.
Note: With file replication the postmaster will not be active for
mirrors so nothing will be listening on this port, instead the
"replicationPort" is used for primary-mirror communication.
"""
return checkNotNone("port", self.port)
def getSegmentReplicationPort(self):
"""
Returns the replicationPort for the segment, this is the port used for
communication between the primary and mirror for file replication.
Note: is Nullable (so can return None)
"""
return self.replicationPort
def getSegmentHostName(self):
"""
Returns the actual `hostname` for the host
Note: use getSegmentAddress for the network address to use
"""
return self.hostname
def getSegmentAddress(self):
"""
Returns the network address to use to contact the segment (i.e. the NIC address).
"""
return self.address
def getSegmentDataDirectory(self):
"""
Return the primary datadirectory location for the segment.
Note: the datadirectory is just one of the filespace locations
associated with the segment, calling code should be carefull not
to assume that this is the only directory location for this segment.
Todo: evaluate callers of this function to see if they should really
be dealing with a list of filespaces.
"""
return checkNotNone("dataDirectory", self.datadir)
def getSegmentFilespaces(self):
"""
Returns the filespace dictionary of oid->path pairs
"""
return self.__filespaces
# --------------------------------------------------------------------
# setters
# --------------------------------------------------------------------
def setSegmentDbId(self, dbId):
checkNotNone("dbId", dbId)
self.dbid = dbId
def setSegmentContentId(self, contentId):
checkNotNone("contentId", contentId)
if contentId < -1:
raise Exception("Invalid content id %s" % contentId)
self.content = contentId
def setSegmentRole(self, role):
checkNotNone("role", role)
if role not in VALID_ROLES:
raise Exception("Invalid role '%s'" % role)
self.role = role
def setSegmentPreferredRole(self, preferredRole):
checkNotNone("preferredRole", preferredRole)
if preferredRole not in VALID_ROLES:
raise Exception("Invalid preferredRole '%s'" % preferredRole)
self.preferred_role = preferredRole
def setSegmentMode(self, mode):
checkNotNone("mode", mode)
if not mode in VALID_MODE:
raise Exception("Invalid mode '%s'" % mode)
self.mode = mode
def setSegmentStatus(self, status):
checkNotNone("status", status)
if status not in VALID_STATUS:
raise Exception("Invalid status '%s'" % status)
self.status = status
def setSegmentPort(self, port):
checkNotNone("port", port)
checkIsInt("port", port)
self.port = port
def setSegmentReplicationPort(self, replicationPort):
# None is allowed -- don't check nonNone
if replicationPort is not None:
checkIsInt("replicationPort", replicationPort)
self.replicationPort = replicationPort
def setSegmentHostName(self, hostName):
# None is allowed -- don't check
self.hostname = hostName
def setSegmentAddress(self, address):
# None is allowed -- don't check
self.address = address
def setSegmentDataDirectory(self, dataDirectory):
checkNotNone("dataDirectory", dataDirectory)
self.datadir = dataDirectory
def addSegmentFilespace(self, oid, path):
"""
Add a filespace path for this segment.
Throws:
Exception - if a path has already been specified for this segment.
"""
# gpfilespace adds a special filespace with oid=None to indicate
# the filespace that it is currently building, since the filespace
# does not yet exist there is no valid value that could be used.
if oid == None:
if self.__pending_filespace:
raise Exception("Duplicate filespace path for dbid %d" %
self.dbid)
self.__pending_filespace = path
return
# oids should always be integer values > 0
oid = int(oid)
assert(oid > 0)
# The more usual case just sets the filespace in the filespace
# dictionary
if oid in self.__filespaces:
raise Exception("Duplicate filespace path for "
"dbid %d filespace %d" % (self.dbid, oid))
self.__filespaces[oid] = path
def getSegmentPendingFilespace(self):
"""
Returns the pending filespace location for this segment
(called by gpfilespace)
"""
return self.__pending_filespace
# ============================================================================
class Segment:
"""
Used to represent all of the SegmentDBs with the same contentID. Today this
can be at most a primary SegDB and a single mirror SegDB. In the future we
will most likely support multiple mirror segDBs.
Note: This class seems to complicate the implementation of gparray, without
adding much value. Perhaps it should be removed.
"""
primaryDB=None
mirrorDBs=None
# --------------------------------------------------------------------
def __init__(self):
self.mirrorDBs=[]
pass
# --------------------------------------------------------------------
def __str__(self):
return "(Primary: %s, Mirrors: [%s])" % (str(self.primaryDB),
','.join([str(segdb) for segdb in self.mirrorDBs]))
# --------------------------------------------------------------------
def addPrimary(self,segDB):
self.primaryDB=segDB
def addMirror(self,segDB):
self.mirrorDBs.append(segDB)
# --------------------------------------------------------------------
def get_dbs(self):
dbs=[]
if self.primaryDB is not None: # MPP-10886 don't add None to result list
dbs.append(self.primaryDB)
if len(self.mirrorDBs) > 0:
dbs.extend(self.mirrorDBs)
return dbs
# --------------------------------------------------------------------
def get_hosts(self):
hosts=[]
hosts.append(self.primaryDB.hostname)
for m in self.mirrorDBs:
hosts.append(m.hostname)
return hosts
def is_segment_pair_valid(self):
"""Validates that the primary/mirror pair are in a valid state"""
for mirror_db in self.mirrorDBs:
prim_status = self.primaryDB.getSegmentStatus()
prim_mode = self.primaryDB.getSegmentMode()
mirror_status = mirror_db.getSegmentStatus()
mirror_role = mirror_db.getSegmentMode()
if (prim_status, prim_mode, mirror_status, mirror_role) not in VALID_SEGMENT_STATES:
return False
return True
# --------------------------------------------------------------------
# --------------------------------------------------------------------
class SegmentRow():
def __init__(self, content, isprimary, dbid, host, address, port, fulldir, prPort, fileSpaceDictionary = None):
self.content = content
self.isprimary = isprimary
self.dbid = dbid
self.host = host
self.address = address
self.port = port
self.fulldir = fulldir
self.prPort = prPort
self.fileSpaceDictionary = fileSpaceDictionary
def __str__(self):
retVal = "" + \
"content = " + str(self.content) + "\n" + \
"isprimary =" + str(self.isprimary) + "\n" + \
"dbid = " + str(self.dbid) + "\n" + \
"host = " + str(self.host) + "\n" + \
"address = " + str(self.address) + "\n" + \
"port = " + str(self.port) + "\n" + \
"fulldir = " + str(self.fulldir) + "\n" + \
"prPort = " + str(self.prPort) + "\n" + \
"fileSpaceDictionary = " + str(self.fileSpaceDictionary) + "\n" + "\n"
def createSegmentRows( hostlist
, interface_list
, primary_list
, primary_portbase
, mirror_type
, mirror_list
, mirror_portbase
, dir_prefix
, primary_replication_portbase
, mirror_replication_portbase
, primary_fs_list = None
, mirror_fs_list = None
):
"""
This method will return a list of SegmentRow objects that represent new segments on each host.
The "hostlist" parameter contains both existing hosts as well as any new hosts that are
a result of expansion.
"""
rows =[]
dbid = 0
content = 0
for host in hostlist:
isprimary='t'
port=primary_portbase
prPort = primary_replication_portbase
index = 0
for pdir in primary_list:
fulldir = "%s/%s%d" % (pdir,dir_prefix,content)
if len(interface_list) > 0:
interfaceNumber = interface_list[index % len(interface_list)]
address = host + '-' + str(interfaceNumber)
else:
address = host
fsDict = {}
if primary_fs_list != None and len(primary_fs_list) > index:
fsDict = primary_fs_list[index]
fullFsDict = {}
for oid in fsDict:
fullFsDict[oid] = "%s/%s%d" % (fsDict[oid], dir_prefix, content)
rows.append( SegmentRow( content = content
, isprimary = isprimary
, dbid = dbid
, host = host
, address = address
, port = port
, fulldir = fulldir
, prPort = prPort
, fileSpaceDictionary = fullFsDict
) )
port += 1
if prPort != None:
prPort += 1
content += 1
dbid += 1
index = index + 1
#mirrors
if mirror_type is None or mirror_type == 'none':
return rows
elif mirror_type.lower().strip() == 'spread':
#TODO: must be sure to put mirrors on a different subnet than primary.
# this is a general problem for GPDB these days. perhaps we should
# add something to gpdetective to be able to detect this and fix it.
# best to have the interface mapping stuff 1st.
content=0
isprimary='f'
num_hosts = len(hostlist)
num_dirs=len(primary_list)
if num_hosts <= num_dirs:
raise Exception("Not enough hosts for spread mirroring. You must have more hosts than primary segments per host")
mirror_port = {}
mirror_replication_port = {}
mirror_host_offset=1
last_mirror_offset=1
for host in hostlist:
mirror_host_offset = last_mirror_offset + 1
last_mirror_offset += 1
index = 0
for mdir in mirror_list:
fulldir = "%s/%s%d" % (mdir,dir_prefix,content)
fsDict = {}
if mirror_fs_list != None and len(mirror_fs_list) > index:
fsDict = mirror_fs_list[index]
fullFsDict = {}
for oid in fsDict:
fullFsDict[oid] = "%s/%s%d" % (fsDict[oid], dir_prefix, content)
mirror_host = hostlist[mirror_host_offset % num_hosts]
if mirror_host == host:
mirror_host_offset += 1
mirror_host = hostlist[mirror_host_offset % num_hosts]
if len(interface_list) > 0:
interfaceNumber = interface_list[mirror_host_offset % len(interface_list)]
address = mirror_host + '-' + str(interfaceNumber)
else:
address = mirror_host
if not mirror_port.has_key(mirror_host):
mirror_port[mirror_host] = mirror_portbase
if not mirror_replication_port.has_key(mirror_host):
mirror_replication_port[mirror_host] = mirror_replication_portbase
rows.append( SegmentRow( content = content
, isprimary = isprimary
, dbid = dbid
, host = mirror_host
, address = address
, port = mirror_port[mirror_host]
, fulldir = fulldir
, prPort = mirror_replication_port[mirror_host]
, fileSpaceDictionary = fullFsDict
) )
mirror_port[mirror_host] += 1
mirror_replication_port[mirror_host] += 1
content += 1
dbid += 1
mirror_host_offset += 1
index = index + 1
elif mirror_type.lower().strip() == 'grouped':
content = 0
num_hosts = len(hostlist)
if num_hosts < 2:
raise Exception("Not enough hosts for grouped mirroring. You must have at least 2")
#we'll pick our mirror host to be 1 host "ahead" of the primary.
mirror_host_offset = 1
isprimary='f'
for host in hostlist:
mirror_host = hostlist[mirror_host_offset % num_hosts]
mirror_host_offset += 1
port = mirror_portbase
mrPort = mirror_replication_portbase
index = 0
for mdir in mirror_list:
fulldir = "%s/%s%d" % (mdir,dir_prefix,content)
if len(interface_list) > 0:
interfaceNumber = interface_list[(index + 1) % len(interface_list)]
address = mirror_host + '-' + str(interfaceNumber)
else:
address = mirror_host
fsDict = {}
if mirror_fs_list != None and len(mirror_fs_list) > index:
fsDict = mirror_fs_list[index]
fullFsDict = {}
for oid in fsDict:
fullFsDict[oid] = "%s/%s%d" % (fsDict[oid], dir_prefix, content)
rows.append( SegmentRow( content = content
, isprimary = isprimary
, dbid = dbid
, host = mirror_host
, address = address
, port = port
, fulldir = fulldir
, prPort = mrPort
, fileSpaceDictionary = fullFsDict
) )
port += 1
mrPort +=1
content += 1
dbid += 1
index = index + 1
else:
raise Exception("Invalid mirror type specified: %s" % mirror_type)
return rows
#========================================================================
def createSegmentRowsFromSegmentList( newHostlist
, interface_list
, primary_segment_list
, primary_portbase
, mirror_type
, mirror_segment_list
, mirror_portbase
, dir_prefix
, primary_replication_portbase
, mirror_replication_portbase
):
"""
This method will return a list of SegmentRow objects that represent an expansion of existing
segments on new hosts.
"""
rows = []
dbid = 0
content = 0
interfaceDict = {}
for host in newHostlist:
isprimary='t'
port=primary_portbase
prPort = primary_replication_portbase
index = 0
for pSeg in primary_segment_list:
if len(interface_list) > 0:
interfaceNumber = interface_list[index % len(interface_list)]
address = host + '-' + str(interfaceNumber)
interfaceDict[content] = index % len(interface_list)
else:
address = host
newFulldir = "%s/%s%d" % (GpDB.getDataDirPrefix(pSeg.getSegmentDataDirectory()), dir_prefix, content)
newFileSpaceDictionary = GpDB.getFileSpaceDirsWithNewSuffix(pSeg.getSegmentFilespaces(), dir_prefix + str(content), includeSystemFilespace = False)
rows.append( SegmentRow( content = content
, isprimary = isprimary
, dbid = dbid
, host = host
, address = address
, port = port
, fulldir = newFulldir
, prPort = prPort
, fileSpaceDictionary = newFileSpaceDictionary
) )
port += 1
if prPort != None:
prPort += 1
content += 1
dbid += 1
index += 1
#mirrors
if mirror_type is None or mirror_type == 'none':
return rows
elif mirror_type.lower().strip() == 'spread':
content=0
isprimary='f'
num_hosts = len(newHostlist)
num_dirs=len(primary_segment_list)
if num_hosts <= num_dirs:
raise Exception("Not enough hosts for spread mirroring. You must have more hosts than primary segments per host")
mirror_port = {}
mirror_replication_port = {}
mirror_host_offset=1
last_mirror_offset=0
for host in newHostlist:
mirror_host_offset = last_mirror_offset + 1
last_mirror_offset += 1
for mSeg in mirror_segment_list:
newFulldir = "%s/%s%d" % (GpDB.getDataDirPrefix(mSeg.getSegmentDataDirectory()), dir_prefix, content)
newFileSpaceDictionary = GpDB.getFileSpaceDirsWithNewSuffix(mSeg.getSegmentFilespaces(), dir_prefix + str(content), includeSystemFilespace = False)
mirror_host = newHostlist[mirror_host_offset % num_hosts]
if mirror_host == host:
mirror_host_offset += 1
mirror_host = newHostlist[mirror_host_offset % num_hosts]
if len(interface_list) > 0:
interfaceNumber = interface_list[(interfaceDict[content] + 1) % len(interface_list)]
address = mirror_host + '-' + str(interfaceNumber)
else:
address = mirror_host
if not mirror_port.has_key(mirror_host):
mirror_port[mirror_host] = mirror_portbase
if not mirror_replication_port.has_key(mirror_host):
mirror_replication_port[mirror_host] = mirror_replication_portbase
rows.append( SegmentRow( content = content
, isprimary = isprimary
, dbid = dbid
, host = mirror_host
, address = address
, port = mirror_port[mirror_host]
, fulldir = newFulldir
, prPort = mirror_replication_port[mirror_host]
, fileSpaceDictionary = newFileSpaceDictionary
) )
mirror_port[mirror_host] += 1
mirror_replication_port[mirror_host] += 1
content += 1
dbid += 1
mirror_host_offset += 1
elif mirror_type.lower().strip() == 'grouped':
content = 0
num_hosts = len(newHostlist)
if num_hosts < 2:
raise Exception("Not enough hosts for grouped mirroring. You must have at least 2")
#we'll pick our mirror host to be 1 host "ahead" of the primary.
mirror_host_offset = 1
isprimary='f'
for host in newHostlist:
mirror_host = newHostlist[mirror_host_offset % num_hosts]
mirror_host_offset += 1
port = mirror_portbase
mrPort = mirror_replication_portbase
index = 0
for mSeg in mirror_segment_list:
if len(interface_list) > 0:
interfaceNumber = interface_list[(interfaceDict[content] + 1) % len(interface_list)]
address = mirror_host + '-' + str(interfaceNumber)
else:
address = mirror_host
newFulldir = "%s/%s%d" % (GpDB.getDataDirPrefix(mSeg.getSegmentDataDirectory()), dir_prefix, content)
newFileSpaceDictionary = GpDB.getFileSpaceDirsWithNewSuffix(mSeg.getSegmentFilespaces(), dir_prefix + str(content), includeSystemFilespace = False)
rows.append( SegmentRow( content = content
, isprimary = isprimary
, dbid = dbid
, host = mirror_host
, address = address
, port = port
, fulldir = newFulldir
, prPort = mrPort
, fileSpaceDictionary = newFileSpaceDictionary
) )
port += 1
mrPort +=1
content += 1
dbid += 1
index = index + 1
else:
raise Exception("Invalid mirror type specified: %s" % mirror_type)
return rows
#========================================================================
def parseTrueFalse(value):
if value.lower() == 'f':
return False
elif value.lower() == 't':
return True
raise Exception('Invalid true/false value')
# TODO: Destroy this (MPP-7686)
# Now that "hostname" is a distinct field in gp_segment_configuration
# attempting to derive hostnames from interaface names should be eliminated.
#
def get_host_interface(full_hostname):
(host_part, inf_num) = ['-'.join(full_hostname.split('-')[:-1]),
full_hostname.split('-')[-1]]
if host_part == '' or not inf_num.isdigit():
return (full_hostname, None)
else:
return (host_part, inf_num)
class GpFilesystemObj:
"""
List information for a filesystem, as stored in pg_filesystem
"""
def __init__(self, oid, name, shared):
self.__oid = oid
self.__name = name
self.__shared = shared
def getOid(self):
return self.__oid
def getName(self):
return self.__name
def isShared(self):
return self.__shared == True
@staticmethod
def getFilesystemObj(filesystemArr, fsoid):
# local storage
if fsoid == 0:
return None
# plugin storage
for fsys in filesystemArr:
if (fsys.getOid() == fsoid):
return fsys
raise Exception("Error: invalid file system oid %d" % (fsoid))
class GpFilespaceObj:
"""
List information for a filespace, as stored in pg_filespace
"""
def __init__(self, oid, name, fsys):
self.__oid = oid
self.__name = name
self.__fsys = fsys
def getOid(self):
return self.__oid
def getName(self):
return self.__name
def getFsys(self):
return self.__fsys
def isSystemFilespace(self):
return self.__oid == SYSTEM_FILESPACE
# ============================================================================
class GpArray:
"""
GpArray is a python class that describes a Greenplum array.
A Greenplum array consists of:
master - The primary QD for the array
standby master - The mirror QD for the array [optional]
segment array - an array of segments within the cluster
Each segment is either a single GpDB object, or a primary/mirror pair.
It can be initialized either from a database connection, in which case
it discovers the configuration information by examining the catalog, or
via a configuration file.
"""
# --------------------------------------------------------------------
def __init__(self, segments, segmentsAsLoadedFromDb=None, strategyLoadedFromDb=None):
"""
segmentsInDb is used only be the configurationImpl* providers; it is used to track the state of the
segments in the database
TODO:
"""
self.master = None
self.standbyMaster = None
self.segments = []
self.expansionSegments=[]
self.numPrimarySegments = 0
self.recoveredSegmentDbids = []
self.__version = None
self.__segmentsAsLoadedFromDb = segmentsAsLoadedFromDb
self.__strategyLoadedFromDb = strategyLoadedFromDb
self.__strategy = FAULT_STRATEGY_NONE
self.san_mount_by_dbid = {}
self.san_mounts = {}
self.setFilespaces([])
for segdb in segments:
# Handle QD nodes
if segdb.isSegmentMaster(True):
if self.master != None:
logger.error("multiple master dbs defined")
raise Exception("GpArray - multiple master dbs defined")
self.master = segdb
elif segdb.isSegmentStandby(True):
if self.standbyMaster != None:
logger.error("multiple standby master dbs defined")
raise Exception("GpArray - multiple standby master dbs defined")
self.standbyMaster = segdb
# Handle regular segments
elif segdb.isSegmentQE():
self.addSegmentDb(segdb)
else:
# Not a master, standbymaster, primary, or mirror?
# shouldn't even be possible.
logger.error("FATAL - invalid dbs defined")
raise Exception("Error: GpArray() - invalid dbs defined")
# Make sure we have a master db
if self.master is None:
logger.error("FATAL - no master dbs defined!")
raise Exception("Error: GpArray() - no master dbs defined")
def __str__(self):
return "Master: %s\nStandby: %s\nSegments: %s" % (str(self.master),
str(self.standbyMaster) if self.standbyMaster else 'Not Configured',
"\n".join([str(seg) for seg in self.segments]))
def addSegmentDb(self, segdb):
content = segdb.getSegmentContentId()
while len(self.segments) <= content:
self.segments.insert(content, Segment())
seg = self.segments[content]
if segdb.isSegmentPrimary(True):
seg.addPrimary(segdb)
self.numPrimarySegments += 1
else:
seg.addMirror(segdb)
# --------------------------------------------------------------------
def isStandardArray(self):
"""
This method will check various aspects of the array to see if it looks like a standard
setup. It returns two values:
True or False depending on if the array looks like a standard array.
If message if the array does not look like a standard array.
"""
try:
# Do all the segments contain the same number of primary and mirrors.
firstNumPrimaries = 0
firstNumMirrors = 0
firstHost = ""
first = True
dbList = self.getDbList(includeExpansionSegs = True)
gpdbByHost = self.getSegmentsByHostName(dbList)
for host in gpdbByHost:
gpdbList = gpdbByHost[host]
if len(gpdbList) == 1 and gpdbList[0].isSegmentQD() == True:
# This host has one master segment and nothing else
continue
if len(gpdbList) == 2 and gpdbList[0].isSegmentQD() and gpdbList[1].isSegmentQD():
# This host has the master segment and its mirror and nothing else
continue
numPrimaries = 0
numMirrors = 0
for gpdb in gpdbList:
if gpdb.isSegmentQD() == True:
continue
if gpdb.isSegmentPrimary() == True:
numPrimaries = numPrimaries + 1
else:
numMirrors = numMirrors + 1
if first == True:
firstNumPrimaries = numPrimaries
firstNumMirrors = numMirrors
firstHost = host
first = False
if numPrimaries != firstNumPrimaries:
raise Exception("The number of primary segments is not consistent across all nodes: %s != %s." % (host, firstHost))
elif numMirrors != firstNumMirrors:
raise Exception("The number of mirror segments is not consistent across all nodes. %s != %s." % (host, firstHost))
# Make sure the address all have the same suffix "-<n>" (like -1, -2, -3...)
firstSuffixList = []
first = True
suffixList = []
for host in gpdbByHost:
gpdbList = gpdbByHost[host]
for gpdb in gpdbList:
if gpdb.isSegmentMaster() == True:
continue
address = gpdb.getSegmentAddress()
if address == host:
if len(suffixList) == 0:
continue
else:
raise Exception("The address value for %s is the same as the host name, but other addresses on the host are not." % address)
suffix = address.split('-')[-1]
if suffix.isdigit() == False:
raise Exception("The address value for %s does not correspond to a standard address." % address)
suffixList.append(suffix)
suffixList.sort()
if first == True:
firstSuffixList = suffixList
first = False
if suffixList != firstSuffixList:
raise Exception("The address list for %s doesn't not have the same pattern as %s." % (str(suffixList), str(firstSuffixList)))
except Exception, e:
# Assume any exception implies a non-standard array
return False, str(e)
return True, ""
# --------------------------------------------------------------------
@staticmethod
def initFromCatalog(dbURL, utility=False, useAllSegmentFileSpaces=False):
"""
Factory method, initializes a GpArray from provided database URL
Please note that -
useAllSegmentFilespaces when set to true makes this method add *all* filespaces
to the segments of gparray. If false, only returns Master/Standby all filespaces
This is *hacky* and we know that it is not the right way to design methods/interfaces
We are doing this so that we do not affect behavior of existing tools like upgrade, gprecoverseg etc
"""
conn = dbconn.connect(dbURL, utility)
# Get the version from the database:
version_str = None
for row in dbconn.execSQL(conn, "SELECT version()"):
version_str = row[0]
version = GpVersion(version_str)
if version.getVersionRelease() in ("3.0", "3.1", "3.2", "3.3"):
# In older releases we get the fault strategy using the
# gp_fault_action guc.
strategy_rows = dbconn.execSQL(conn, "show gp_fault_action")
# Note: Mode may not be "right", certainly 4.0 concepts of mirroring
# mode do not apply to 3.x, so it depends on how the scripts are
# making use of mode. For now it is initialized to synchronized.
#
# Note: hostname is initialized to null since the catalog does not
# contain this information. Initializing a hostcache using the
# resulting gparray will automatically fill in a value for hostname.
#
# Note: this should be kept in sync with the code in
# GpDB.InitFromString() code for initializing old catalog formats.
config_rows = dbconn.execSQL(conn, '''
SELECT registration_order,
role,
status,
null as hostname,
hostname as address,
port,
%s as fsoid,
datadir as fselocation
FROM pg_catalog.gp_configuration
ORDER BY registration_order
''' % str(SYSTEM_FILESPACE))
# No SAN support in these older releases.
san_segs_rows = []
san_rows = []
# no filespace support in older releases.
filespaceArr = []
# no filesystem support in older releases.
else:
strategy_rows = dbconn.execSQL(conn, '''
SELECT fault_strategy FROM gp_fault_strategy
''')
config_rows = dbconn.execSQL(conn, '''
SELECT dbid, content, role, preferred_role, mode, status,
hostname, address, port, replication_port, fs.oid,
fselocation
FROM pg_catalog.gp_segment_configuration
JOIN pg_catalog.pg_filespace_entry on (dbid = fsedbid)
JOIN pg_catalog.pg_filespace fs on (fsefsoid = fs.oid)
ORDER BY content, preferred_role DESC, fs.oid
''')
san_segs_rows = dbconn.execSQL(conn, '''
SELECT dbid, content, status, unnest(san_mounts)
FROM pg_catalog.gp_segment_configuration
WHERE content >= 0
ORDER BY content, dbid
''')
san_rows = dbconn.execSQL(conn, '''
SELECT mountid, active_host, san_type,
primary_host, primary_mountpoint, primary_device,
mirror_host, mirror_mountpoint, mirror_device
FROM pg_catalog.gp_san_configuration
ORDER BY mountid
''')
# All of filesystem is shared storage
filesystemRows = dbconn.execSQL(conn, '''
SELECT oid, fsysname, true AS fsysshared
FROM pg_filesystem
ORDER BY fsysname
''')
filesystemArr = [GpFilesystemObj(fsysRow[0], fsysRow[1], fsysRow[2]) for fsysRow in filesystemRows]
filespaceRows = dbconn.execSQL(conn, '''
SELECT oid, fsname, fsfsys AS fsoid
FROM pg_filespace
ORDER BY fsname;
''')
filespaceArr = [GpFilespaceObj(fsRow[0], fsRow[1], GpFilesystemObj.getFilesystemObj(filesystemArr, fsRow[2])) for fsRow in filespaceRows]
# Todo: add checks that all segments should have the same filespaces?
recoveredSegmentDbids = []
segments = []
seg = None
for row in config_rows:
# Extract fields from the row
(dbid, content, role, preferred_role, mode, status, hostname,
address, port, replicationPort, fsoid, fslocation) = row
# If we have segments which have recovered, record them.
if preferred_role != role and content >= 0:
if mode == MODE_SYNCHRONIZED and status == STATUS_UP:
recoveredSegmentDbids.append(dbid)
# In GPSQL, only master maintain the filespace information.
if content != MASTER_CONTENT_ID and fsoid != SYSTEM_FILESPACE and not useAllSegmentFileSpaces:
continue
# The query returns all the filespaces for a segment on separate
# rows. If this row is the same dbid as the previous row simply
# add this filespace to the existing list, otherwise create a
# new segment.
if seg and seg.getSegmentDbId() == dbid:
seg.addSegmentFilespace(fsoid, fslocation)
else:
seg = GpDB(content, preferred_role, dbid, role, mode, status,
hostname, address, port, fslocation, replicationPort)
segments.append(seg)
for seg in segments:
datcatloc = dbconn.execSQL(conn, '''
select fsloc.fselocation || '/' ||
case when db.dattablespace = 1663
then 'base'
else db.dattablespace::text
end || '/'||db.oid as catloc
from pg_Database db, pg_tablespace ts,
(SELECT dbid, fs.oid, fselocation
FROM pg_catalog.gp_segment_configuration
JOIN pg_catalog.pg_filespace_entry on (dbid = fsedbid)
JOIN pg_catalog.pg_filespace fs on (fsefsoid = fs.oid)) fsloc
where db.dattablespace = ts.oid
and ts.spcfsoid = fsloc.oid
and fsloc.dbid = %d
''' % seg.dbid)
seg.catdirs = []
for row in datcatloc:
seg.catdirs.append(row[0])
conn.close()
origSegments = [seg.copy() for seg in segments]
if strategy_rows.rowcount == 0:
raise Exception("Database does not contain gp_fault_strategy entry")
if strategy_rows.rowcount > 1:
raise Exception("Database does too many gp_fault_strategy entries")
strategy = strategy_rows.fetchone()[0]
array = GpArray(segments, origSegments, strategy)
array.__version = version
array.recoveredSegmentDbids = recoveredSegmentDbids
array.setFaultStrategy(strategy)
array.setSanConfig(san_rows, san_segs_rows)
array.setFilespaces(filespaceArr)
array.setFilesystem(filesystemArr)
return array
# --------------------------------------------------------------------
@staticmethod
def initFromFile(filename):
"""
Factory method: creates a GpArray from an input file
(called by gpexpand.)
Note: Currently this is only used by the gpexpand rollback facility,
and by gpsuspend utility,
there is currently NO expectation that this file format is saved
on disk in any long term fashion.
Format changes of the file are acceptable until this assumption is
changed, but initFromFile and dumpToFile must be kept in parity.
"""
segdbs=[]
fp = open(filename, 'r')
for line in fp:
segdbs.append(GpDB.initFromString(line))
fp.close()
return GpArray(segdbs)
# --------------------------------------------------------------------
def is_array_valid(self):
"""Checks that each primary/mirror pair is in a valid state"""
for seg in self.segments:
if not seg.is_segment_pair_valid():
return False
return True
# --------------------------------------------------------------------
def dumpToFile(self, filename):
"""
Dumps a GpArray to a file (called by gpexpand)
Note: See notes above for initFromFile()
"""
fp = open(filename, 'w')
for gpdb in self.getDbList():
fp.write(repr(gpdb) + '\n')
fp.close()
# --------------------------------------------------------------------
def setSanConfig(self, san_row_list, san_segs_list):
"""
Sets up the san-config.
san_row_list is essentially the contents of gp_san_config (which may be empty)
san_segs_list is the content of the san_mounts field of gp_segment_configuration
(also potentially empty), unnested.
These are raw results sets. We build two maps:
Map1: from dbid to a list of san-mounts.
Map2: from mount-id to the san_config attributes.
The code below has to match the SQL inside initFromCatalog()
"""
# First collect the "unnested" mount-ids into a list.
dbid_map = {}
for row in san_segs_list:
(dbid, content, status, mountid) = row
if dbid_map.has_key(dbid):
(status, content, mount_list) = dbid_map[dbid]
dbid_map[dbid] = (status, content, mount_list.append(mountid))
else:
dbid_map[dbid] = (status, content, [mountid])
# dbid_map now contains a flat mapping from dbid -> (status, list of mountpoints)
san_map = {}
for row in san_row_list:
(mountid, active, type, p_host, p_mp, p_dev, m_host, m_mp, m_dev) = row
san_map[mountid] = {'active':active, 'type':type,
'primaryhost':p_host, 'primarymountpoint':p_mp, 'primarydevice':p_dev,
'mirrorhost':m_host, 'mirrormountpoint':m_mp, 'mirrordevice':m_dev
}
self.san_mount_by_dbid = dbid_map
self.san_mounts = san_map
# --------------------------------------------------------------------
def getSanConfigMaps(self):
return (self.san_mounts, self.san_mount_by_dbid)
# --------------------------------------------------------------------
def setFaultStrategy(self, strategy):
"""
Sets the fault strategy of the array.
The input strategy should either be a valid fault strategy code:
['n', 'f', ...]
Or it should be a valid fault strategy label:
['none', 'physical mirroring', ...]
The reason that we need to accept both forms of input is that fault
strategy is modeled differently in the catalog depending on the catalog
version.
In 3.x fault strategy is stored using the label via the gp_fault_action
guc.
In 4.0 fault strategy is stored using the code via the gp_fault_strategy
table.
"""
checkNotNone("strategy", strategy)
# Try to lookup the strategy as a label
for (key, value) in FAULT_STRATEGY_LABELS.iteritems():
if value == strategy:
strategy = key
break
if strategy not in VALID_FAULT_STRATEGY:
raise Exception("Invalid fault strategy '%s'" % strategy)
self.__strategy = strategy
# --------------------------------------------------------------------
def getFaultStrategy(self):
"""
Will return a string matching one of the FAULT_STRATEGY_* constants
"""
if self.__strategy not in VALID_FAULT_STRATEGY:
raise Exception("Fault strategy is not set correctly: '%s'" %
self.__strategy)
return self.__strategy
# --------------------------------------------------------------------
def setFilesystem(self, filesystemArr):
"""
@param filesystemArr of GpFilesystemObj objects
"""
self.__filesystemArr = [fsys for fsys in filesystemArr]
def getFilesystem(self):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filesystem name
"""
return [fsys for fsys in self.__filesystemArr]
def setFilespaces(self, filespaceArr):
"""
@param filespaceArr of GpFilespaceObj objects
"""
self.__filespaceArr = [fs for fs in filespaceArr]
def getFilespaces(self, includeSystemFilespace=True):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
"""
return [fs for fs in self.__filespaceArr if fs.isSystemFilespace()]
def getNonSystemFilespaces(self):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
"""
return [fs for fs in self.__filespaceArr if not fs.isSystemFilespace()]
def getAllFilespaces(self):
"""
@return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
"""
return [fs for fs in self.__filespaceArr]
# --------------------------------------------------------------
def getFileSpaceName(self, filespaceOid):
retValue = None
if self.__filespaceArr != None:
for entry in self.__filespaceArr:
if entry.getOid() == filespaceOid:
retValue = entry.getName()
break
return retValue
# --------------------------------------------------------------
def getFileSpaceOid(self, filespaceName):
retValue = None
if self.__filespaceArr != None:
for entry in self.__filespaceArr:
if entry.getName() == filespaceName:
retValue = entry.getOid()
break
return retValue
# --------------------------------------------------------------
def isFileSpaceShared(self, filespaceOid):
retValue = False
if self.__filespaceArr != None:
for entry in self.__filespaceArr:
if entry.getOid() == filespaceOid:
retValue = entry.getFsys() != None and entry.getFsys().isShared()
break
return retValue
# --------------------------------------------------------------------
def getDbList(self, includeExpansionSegs=False):
"""
Return a list of all GpDb objects that make up the array
"""
dbs=[]
dbs.append(self.master)
if self.standbyMaster:
dbs.append(self.standbyMaster)
if includeExpansionSegs:
dbs.extend(self.getSegDbList(True))
else:
dbs.extend(self.getSegDbList())
return dbs
# --------------------------------------------------------------------
def allSegmentsAlive(self):
"""
Return True if all segments alive
"""
for seg in self.getSegDbList():
if seg.status == STATUS_DOWN:
return False
return True
# --------------------------------------------------------------------
def getHostList(self, includeExpansionSegs = False):
"""
Return a list of all Hosts that make up the array
"""
hostList = []
hostList.append(self.master.getSegmentHostName())
if self.standbyMaster:
hostList.append(self.standbyMaster.getSegmentHostName())
dbList = self.getDbList(includeExpansionSegs = includeExpansionSegs)
for db in dbList:
if db.getSegmentHostName() in hostList:
continue
else:
hostList.append(db.getSegmentHostName())
return hostList
def getDbIdToPeerMap(self):
"""
Returns a map that maps a dbid to the peer segment for that dbid
"""
contentIdToSegments = {}
for seg in self.getSegDbList():
arr = contentIdToSegments.get(seg.getSegmentContentId())
if arr is None:
arr = []
contentIdToSegments[seg.getSegmentContentId()] = arr
arr.append(seg)
result = {}
for contentId, arr in contentIdToSegments.iteritems():
if len(arr) == 1:
pass
elif len(arr) != 2:
raise Exception("Content %s has more than two segments"% contentId)
else:
result[arr[0].getSegmentDbId()] = arr[1]
result[arr[1].getSegmentDbId()] = arr[0]
return result
# --------------------------------------------------------------------
def getSegDbList(self, includeExpansionSegs=False):
"""Return a list of all GpDb objects for all segments in the array"""
dbs=[]
for seg in self.segments:
dbs.extend(seg.get_dbs())
if includeExpansionSegs:
for seg in self.expansionSegments:
dbs.extend(seg.get_dbs())
return dbs
# --------------------------------------------------------------------
def getSegDbMap(self):
"""
Return a map of all GpDb objects that make up the array.
"""
dbsMap = {}
for db in self.getSegDbList():
dbsMap[db.getSegmentDbId()] = db
return dbsMap
# --------------------------------------------------------------------
def getExpansionSegDbList(self):
"""Returns a list of all GpDb objects that make up the new segments
of an expansion"""
dbs=[]
for seg in self.expansionSegments:
dbs.extend(seg.get_dbs())
return dbs
# --------------------------------------------------------------------
def getSegmentContainingDb(self, db):
for seg in self.segments:
for segDb in seg.get_dbs():
if db.getSegmentDbId() == segDb.getSegmentDbId():
return seg
return None
# --------------------------------------------------------------------
def getExpansionSegmentContainingDb(self, db):
for seg in self.expansionSegments:
for segDb in seg.get_dbs():
if db.getSegmentDbId() == segDb.getSegmentDbId():
return seg
return None
# --------------------------------------------------------------------
def get_invalid_segdbs(self):
dbs=[]
for seg in self.segments:
segdb = seg.primaryDB
if not segdb.valid:
dbs.append(segdb)
for db in seg.mirrorDBs:
if not db.valid:
dbs.append(db)
return dbs
# --------------------------------------------------------------------
def get_synchronized_segdbs(self):
dbs=[]
for seg in self.segments:
segdb = seg.primaryDB
if segdb.mode == MODE_SYNCHRONIZED:
dbs.append(segdb)
for segdb in seg.mirrorDBs:
if segdb.mode == MODE_SYNCHRONIZED:
dbs.append(segdb)
return dbs
# --------------------------------------------------------------------
def get_unbalanced_segdbs(self):
dbs=[]
for seg in self.segments:
for segdb in seg.get_dbs():
if segdb.preferred_role != segdb.role:
dbs.append(segdb)
return dbs
# --------------------------------------------------------------------
def get_unbalanced_primary_segdbs(self):
dbs = [seg for seg in self.get_unbalanced_segdbs() if seg.role == ROLE_PRIMARY]
return dbs
# --------------------------------------------------------------------
def get_inactive_mirrors_segdbs(self):
if self.__strategy != FAULT_STRATEGY_SAN:
return []
dbs=[]
for seg in self.segments:
segdb = seg.primaryDB
for db in seg.mirrorDBs:
dbs.append(db)
return dbs
# --------------------------------------------------------------------
def get_valid_segdbs(self):
dbs=[]
for seg in self.segments:
db = seg.primaryDB
if db.valid:
dbs.append(db)
for db in seg.mirrorDBs:
if db.valid:
dbs.append(db)
return dbs
# --------------------------------------------------------------------
def get_hostlist(self, includeMaster=True):
hosts=[]
if includeMaster:
hosts.append(self.master.hostname)
if self.standbyMaster is not None:
hosts.append(self.standbyMaster.hostname)
for seg in self.segments:
hosts.extend(seg.get_hosts())
return hosts
# --------------------------------------------------------------------
def get_max_dbid(self,includeExpansionSegs=False):
"""Returns the maximum dbid in the array. If includeExpansionSegs
is True, this includes the expansion segment array in the search"""
dbid = 0
for db in self.getDbList(includeExpansionSegs):
if db.getSegmentDbId() > dbid:
dbid = db.getSegmentDbId()
return dbid
# --------------------------------------------------------------------
def get_max_contentid(self, includeExpansionSegs=False):
"""Returns the maximum contentid in the array. If includeExpansionSegs
is True, this includes the expansion segment array in the search"""
content = 0
for db in self.getDbList(includeExpansionSegs):
if db.content > content:
content = db.content
return content
# --------------------------------------------------------------------
def get_segment_count(self):
return len(self.segments)
# --------------------------------------------------------------------
def get_min_primary_port(self):
"""Returns the minimum primary segment db port"""
min_primary_port = self.segments[0].primaryDB.port
for seg in self.segments:
if seg.primaryDB.port < min_primary_port:
min_primary_port = seg.primaryDB.port
return min_primary_port
# --------------------------------------------------------------------
def get_max_primary_port(self):
"""Returns the maximum primary segment db port"""
max_primary_port = self.segments[0].primaryDB.port
for seg in self.segments:
if seg.primaryDB.port > max_primary_port:
max_primary_port = seg.primaryDB.port
return max_primary_port
# --------------------------------------------------------------------
def get_min_mirror_port(self):
"""Returns the minimum mirror segment db port"""
if self.get_mirroring_enabled() is False:
raise Exception('Mirroring is not enabled')
min_mirror_port = self.segments[0].mirrorDBs[0].port
for seg in self.segments:
for db in seg.mirrorDBs:
if db.port < min_mirror_port:
min_mirror_port = db.port
return min_mirror_port
# --------------------------------------------------------------------
def get_max_mirror_port(self):
"""Returns the maximum mirror segment db port"""
if self.get_mirroring_enabled() is False:
raise Exception('Mirroring is not enabled')
max_mirror_port = self.segments[0].mirrorDBs[0].port
for seg in self.segments:
for db in seg.mirrorDBs:
if db.port > max_mirror_port:
max_mirror_port = db.port
return max_mirror_port
# --------------------------------------------------------------------
def get_min_primary_replication_port(self):
"""Returns the minimum primary segment db replication port"""
if self.get_mirroring_enabled() is False:
raise Exception('Mirroring is not enabled')
min_primary_replication_port = self.segments[0].primaryDB.replicationPort
for seg in self.segments:
if seg.primaryDB.replicationPort < min_primary_replication_port:
min_primary_replication_port = seg.primaryDB.replicationPort
return min_primary_replication_port
# --------------------------------------------------------------------
def get_max_primary_replication_port(self):
"""Returns the maximum primary segment db replication port"""
if self.get_mirroring_enabled() is False:
raise Exception('Mirroring is not enabled')
max_primary_replication_port = self.segments[0].primaryDB.replicationPort
for seg in self.segments:
if seg.primaryDB.replicationPort > max_primary_replication_port:
max_primary_replication_port = seg.primaryDB.replicationPort
return max_primary_replication_port
# --------------------------------------------------------------------
def get_min_mirror_replication_port(self):
"""Returns the minimum mirror segment db replication port"""
if self.get_mirroring_enabled() is False:
raise Exception('Mirroring is not enabled')
min_mirror_replication_port = self.segments[0].mirrorDBs[0].replicationPort
for seg in self.segments:
for db in seg.mirrorDBs:
if db.replicationPort < min_mirror_replication_port:
min_mirror_replication_port = db.replicationPort
return min_mirror_replication_port
# --------------------------------------------------------------------
def get_max_mirror_replication_port(self):
"""Returns the maximum mirror segment db replication port"""
if self.get_mirroring_enabled() is False:
raise Exception('Mirroring is not enabled')
max_mirror_replication_port = self.segments[0].mirrorDBs[0].replicationPort
for seg in self.segments:
for db in seg.mirrorDBs:
if db.replicationPort > max_mirror_replication_port:
max_mirror_replication_port = db.replicationPort
return max_mirror_replication_port
# --------------------------------------------------------------------
def get_interface_numbers(self):
"""Returns interface numbers in the array. Assumes that addresses are named
<hostname>-<int_num>. If the nodes just have <hostname> then an empty
array is returned."""
interface_nums = []
primary_hostname = self.segments[0].primaryDB.hostname
primary_address_list = []
dbList = self.getDbList()
for db in dbList:
if db.isSegmentQD() == True:
continue
if db.getSegmentHostName() == primary_hostname:
if db.getSegmentAddress() not in primary_address_list:
primary_address_list.append(db.getSegmentAddress())
for address in primary_address_list:
if address.startswith(primary_hostname) == False or len(primary_hostname) + 2 > len(address):
return []
suffix = address[len(primary_hostname):]
if len(suffix) < 2 or suffix[0] != '-' or suffix[1:].isdigit() == False:
return []
interface_nums.append(suffix[1:])
return interface_nums
# --------------------------------------------------------------------
def get_primary_count(self):
return self.numPrimarySegments
# --------------------------------------------------------------------
def get_mirroring_enabled(self):
"""Returns True if mirrors are defined"""
return len(self.segments[0].mirrorDBs) != 0
# --------------------------------------------------------------------
def get_list_of_primary_segments_on_host(self, hostname):
retValue = []
for db in self.getDbList():
if db.isSegmentPrimary(False) == True and db.getSegmentHostName() == hostname:
retValue.append(db)
return retValue
# --------------------------------------------------------------------
def get_list_of_mirror_segments_on_host(self, hostname):
retValue = []
for db in self.getDbList():
if db.isSegmentMirror(False) == True and db.getSegmentHostName() == hostname:
retValue.append(db)
return retValue
# --------------------------------------------------------------------
def get_primary_root_datadirs(self):
"""
Returns a list of primary data directories minus the <prefix><contentid>
NOTE 1:
This currently assumes that all segments are configured the same
and gets the results only from the host of segment 0
NOTE 2:
The determination of hostname is based on faulty logic
"""
primary_datadirs = []
seg0_hostname = self.segments[0].primaryDB.getSegmentAddress()
(seg0_hostname, inf_num) = get_host_interface(seg0_hostname)
for db in self.getDbList():
if db.isSegmentPrimary(False) and db.getSegmentAddress().startswith(seg0_hostname):
primary_datadirs.append(db.datadir[:db.datadir.rfind('/')])
return primary_datadirs
# --------------------------------------------------------------------
def get_mirror_root_datadirs(self):
"""
Returns a list of mirror data directories minus the <prefix><contentid>
"""
mirror_datadirs = []
seg0_hostname = self.segments[0].primaryDB.getSegmentAddress()
(seg0_hostname, inf_num) = get_host_interface(seg0_hostname)
for db in self.getDbList():
if db.isSegmentMirror(False) and db.getSegmentAddress().startswith(seg0_hostname):
mirror_datadirs.append(db.datadir[:db.datadir.rfind('/')])
return mirror_datadirs
# --------------------------------------------------------------------
def get_datadir_prefix(self):
"""
Returns the prefix portion of <prefix><contentid>
"""
start_last_dir = self.master.datadir.rfind('/') + 1
start_dir_content = self.master.datadir.rfind('-')
prefix = self.master.datadir[start_last_dir:start_dir_content]
return prefix
# --------------------------------------------------------------------
# If we've got recovered segments, and we have a matched-pair, we
# can update the catalog to "rebalance" back to our original primary.
def updateRoleForRecoveredSegs(self, dbURL):
"""
Marks the segment role to match the configured preferred_role.
"""
# walk our list of segments, checking to make sure that
# both members of the peer-group are in our recovered-list,
# save their content-id.
recovered_contents = []
for seg in self.segments:
if seg.primaryDB:
if seg.primaryDB.dbid in self.recoveredSegmentDbids:
if len(seg.mirrorDBs) > 0 and seg.mirrorDBs[0].dbid in self.recoveredSegmentDbids:
recovered_contents.append((seg.primaryDB.content, seg.primaryDB.dbid, seg.mirrorDBs[0].dbid))
conn = dbconn.connect(dbURL, True, allowSystemTableMods = 'dml')
for (content_id, primary_dbid, mirror_dbid) in recovered_contents:
sql = "UPDATE gp_segment_configuration SET role=preferred_role where content = %d" % content_id
dbconn.executeUpdateOrInsert(conn, sql, 2)
# NOTE: primary-dbid (right now) is the mirror.
sql = "INSERT INTO gp_configuration_history VALUES (now(), %d, 'Reassigned role for content %d to MIRROR')" % (primary_dbid, content_id)
dbconn.executeUpdateOrInsert(conn, sql, 1)
# NOTE: mirror-dbid (right now) is the primary.
sql = "INSERT INTO gp_configuration_history VALUES (now(), %d, 'Reassigned role for content %d to PRIMARY')" % (mirror_dbid, content_id)
dbconn.executeUpdateOrInsert(conn, sql, 1)
# We could attempt to update the segments-array.
# But the caller will re-read the configuration from the catalog.
dbconn.execSQL(conn, "COMMIT")
conn.close()
# --------------------------------------------------------------------
def addExpansionSeg(self, content, preferred_role, dbid, role,
hostname, address, port, datadir, replication_port, fileSpaces = None):
"""
Adds a segment to the gparray as an expansion segment.
Note: may work better to construct the new GpDB in gpexpand and
simply pass it in.
"""
if (content <= self.segments[-1].get_dbs()[0].content):
raise Exception('Invalid content ID for expansion segment')
segdb = GpDB(content = content,
preferred_role = preferred_role,
dbid = dbid,
role = role,
mode = MODE_SYNCHRONIZED,
status = STATUS_UP,
hostname = hostname,
address = address,
port = port,
datadir = datadir,
replicationPort = replication_port) # todo: add to parameters
if fileSpaces != None:
for fsOid in fileSpaces:
segdb.addSegmentFilespace(oid = fsOid, path = fileSpaces[fsOid])
seglen = len(self.segments)
expseglen = len(self.expansionSegments)
expseg_index = content - seglen
logger.debug('New segment index is %d' % expseg_index)
if expseglen < expseg_index + 1:
extendByNum = expseg_index - expseglen + 1
logger.debug('Extending expansion array by %d' % (extendByNum))
self.expansionSegments.extend([None] * (extendByNum))
if self.expansionSegments[expseg_index] == None:
self.expansionSegments[expseg_index] = Segment()
seg = self.expansionSegments[expseg_index]
if preferred_role == ROLE_PRIMARY:
if seg.primaryDB:
raise Exception('Duplicate content id for primary segment')
seg.addPrimary(segdb)
else:
seg.addMirror(segdb)
# --------------------------------------------------------------------
def reOrderExpansionSegs(self):
"""
The expansion segments content ID may have changed during the expansion.
This method will re-order the the segments into their proper positions.
Since there can be no gaps in the content id (see validateExpansionSegs),
the seg.expansionSegments list is the same length.
"""
seglen = len(self.segments)
expseglen = len(self.expansionSegments)
newExpansionSegments = []
newExpansionSegments.extend([None] * expseglen)
for seg in self.expansionSegments:
contentId = seg.primaryDB.getSegmentContentId()
index = contentId - seglen
newExpansionSegments[index] = seg
seg.expansionSegments = newExpansionSegments
# --------------------------------------------------------------------
def validateExpansionSegs(self):
""" Checks the segments added for various inconsistencies and errors.
"""
dbids = []
content = []
expansion_seg_count = 0
# make sure we have added at least one segment
if len(self.expansionSegments) == 0:
raise Exception('No expansion segments defined')
# how many mirrors?
mirrors_per_segment = len(self.segments[0].mirrorDBs)
for seg in self.expansionSegments:
# If a segment is 'None' that means we have a gap in the content ids
if seg is None:
raise Exception('Expansion segments do not have contiguous content ids.')
expansion_seg_count += 1
for segdb in seg.get_dbs():
dbids.append(segdb.getSegmentDbId())
if segdb.getSegmentRole() == ROLE_PRIMARY:
isprimary = True
else:
isprimary = False
content.append((segdb.getSegmentContentId(), isprimary))
# mirror count correct for this content id?
if mirrors_per_segment > 0:
if len(seg.mirrorDBs) != mirrors_per_segment:
raise Exception('Expansion segment has incorrect number of mirrors defined.')
else:
#shouldn't have any mirrors
if len(seg.mirrorDBs) != 0:
raise Exception('Expansion segment has a mirror segment defined but mirroring is not enabled.')
# check that the dbids are what they should be
dbids.sort()
# KAS Is the following really true? dbids don't need to be continuous
if dbids[0] != self.get_max_dbid() + 1:
raise Exception('Expansion segments have incorrect dbids')
for i in range(0, len(dbids) - 1):
if dbids[i] != dbids[i + 1] - 1:
raise Exception('Expansion segments have incorrect dbids')
# check that content ids are ok
valid_content = []
for i in range(self.segments[-1].primaryDB.content + 1,
self.segments[-1].primaryDB.content + 1 + len(self.expansionSegments)):
valid_content.append((i, True))
for j in range(0, mirrors_per_segment):
valid_content.append((i, False))
valid_content.sort(lambda x,y: cmp(x[0], y[0]) or cmp(x[1], y[1]))
content.sort(lambda x,y: cmp(x[0], y[0]) or cmp(x[1], y[1]))
if valid_content != content:
raise Exception('Invalid content ids')
# Check for redefinition data dirs and ports
datadirs = {}
used_ports = {}
used_replication_ports = {}
for db in self.getDbList(True):
datadir = db.getSegmentDataDirectory()
hostname = db.getSegmentHostName()
port = db.getSegmentPort()
replication_port = db.getSegmentReplicationPort()
if datadirs.has_key(hostname):
if datadir in datadirs[hostname]:
raise Exception('Data directory %s used multiple times on host %s' % (datadir, hostname))
else:
datadirs[hostname].append(datadir)
else:
datadirs[hostname] = []
datadirs[hostname].append(datadir)
# Check ports
if used_ports.has_key(hostname):
if db.port in used_ports[hostname]:
raise Exception('Port %d is used multiple times on host %s' % (port, hostname))
else:
used_ports[hostname].append(db.port)
else:
used_ports[hostname] = []
used_ports[hostname].append(db.port)
# Check replication ports
if replication_port != None:
if used_replication_ports.has_key(hostname):
if replication_port in used_replication_ports[hostname]:
raise Exception('Replication Port %d is used multiple times on host %s' % (replication_port, hostname))
else:
used_replication_ports[hostname].append(replication_port)
else:
used_replication_ports[hostname] = []
used_replication_ports[hostname].append(replication_port)
# Check for redefinition of filespace dirs
dbList = self.getDbList(includeExpansionSegs = True)
hostDict = GpArray.getSegmentsByHostName(dbList)
for host in hostDict:
segList = hostDict[host]
dirList = []
for seg in segList:
dirDict = seg.getSegmentFilespaces()
for oid in dirDict:
if dirDict[oid] in dirList:
raise Exception('Data directory %s used multiple times on host %s' % (datadir, hostname))
else:
dirList.append(dirDict[oid])
# --------------------------------------------------------------------
def addExpansionHosts(self, hosts, mirror_type):
""" Adds a list of hosts to the array, using the same data
directories as the original hosts. Also adds the mirrors
based on mirror_type.
"""
# remove interface numbers if they exist
existing_hosts = []
for host in self.get_hostlist(True):
if host not in existing_hosts:
existing_hosts.append(host)
new_hosts = []
for host in hosts:
# see if we already have the host
if host in existing_hosts or host in new_hosts:
continue
else:
new_hosts.append(host)
if len(new_hosts) == 0:
raise Exception('No new hosts to add')
""" Get the first segment's host name, and use this host's configuration as a prototype """
seg0_hostname = self.segments[0].primaryDB.getSegmentHostName()
primary_list = self.get_list_of_primary_segments_on_host(seg0_hostname)
mirror_list = self.get_list_of_mirror_segments_on_host(seg0_hostname)
interface_list = self.get_interface_numbers()
base_primary_port = self.get_min_primary_port()
base_mirror_port = 0
base_primary_replication_port = None
base_mirror_replication_port = None
if mirror_type != 'none':
base_mirror_port = self.get_min_mirror_port()
base_primary_replication_port = self.get_min_primary_replication_port()
base_mirror_replication_port = self.get_min_mirror_replication_port()
prefix = self.get_datadir_prefix()
interface_list = self.get_interface_numbers()
interface_list.sort()
rows = createSegmentRowsFromSegmentList( newHostlist = new_hosts
, interface_list = interface_list
, primary_segment_list = primary_list
, primary_portbase = base_primary_port
, mirror_type = mirror_type
, mirror_segment_list = mirror_list
, mirror_portbase = base_mirror_port
, dir_prefix = prefix
, primary_replication_portbase = base_primary_replication_port
, mirror_replication_portbase = base_mirror_replication_port
)
self._fixup_and_add_expansion_segments(rows, interface_list)
# --------------------------------------------------------------------
def addExpansionDatadirs(self, datadirs, mirrordirs, mirror_type, fs_dirs = None, fs_mirror_dirs = None):
""" Adds new segments based on new data directories to both original
hosts and hosts that were added by addExpansionHosts.
"""
max_primary_port = self.get_max_primary_port()
max_mirror_port = 0
max_primary_replication_port = None
max_mirror_replication_port = None
if mirror_type != 'none':
max_mirror_port = self.get_max_mirror_port()
max_primary_replication_port = self.get_max_primary_replication_port()
max_mirror_replication_port = self.get_max_mirror_replication_port()
interface_list = self.get_interface_numbers()
interface_list.sort()
prefix = self.get_datadir_prefix()
hosts = []
# Get all the hosts to add the data dirs to
for seg in self.getSegDbList(includeExpansionSegs = True):
host = seg.getSegmentHostName()
if host not in hosts:
hosts.append(host)
# Create the rows
tempPrimaryRP = None
tempMirrorRP = None
if mirror_type != 'none':
tempPrimaryRP = max_primary_replication_port + 1
tempMirrorRP = max_mirror_replication_port + 1
rows = createSegmentRows( hostlist = hosts
, interface_list = interface_list
, primary_list = datadirs
, primary_portbase = max_primary_port + 1
, mirror_type = mirror_type
, mirror_list = mirrordirs
, mirror_portbase = max_mirror_port + 1
, dir_prefix = prefix
, primary_replication_portbase = tempPrimaryRP
, mirror_replication_portbase = tempMirrorRP
, primary_fs_list = fs_dirs
, mirror_fs_list = fs_mirror_dirs
)
self._fixup_and_add_expansion_segments(rows, interface_list)
# --------------------------------------------------------------------
def _fixup_and_add_expansion_segments(self, rows, interface_list):
"""Fixes up expansion segments added to be after the original segdbs
This includes fixing up the dbids, content ids, data directories,
interface part of the hostnames and mirrors. After this is done, it
adds them to the expansion array."""
interface_count = len(interface_list)
mirror_dict = {}
# must be sorted by isprimary, then hostname
rows.sort(lambda a,b: (cmp(b.isprimary, a.isprimary) or cmp(a.host,b.host)))
current_host = rows[0].host
curr_dbid = self.get_max_dbid(True) + 1
curr_content = self.get_max_contentid(True) + 1
# Fix up the rows with correct dbids, contentids, datadirs and interfaces
for row in rows:
hostname = row.host
address = row.address
# Add the new segment to the expansion segments array
# Remove the content id off of the datadir
new_datadir = row.fulldir[:row.fulldir.rfind(str(row.content))]
if row.isprimary == 't':
new_datadir += ('%d' % curr_content)
new_filespaces = GpDB.replaceFileSpaceContentID( fileSpaceDictionary = row.fileSpaceDictionary
, oldContent = row.content
, newContent = curr_content
)
self.addExpansionSeg(curr_content, ROLE_PRIMARY, curr_dbid,
ROLE_PRIMARY, hostname, address, int(row.port), new_datadir, row.prPort, fileSpaces = new_filespaces)
# The content id was adjusted, so we need to save it for the mirror
mirror_dict[int(row.content)] = int(curr_content)
curr_content += 1
else:
new_content = mirror_dict[int(row.content)]
new_datadir += ('%d' % int(new_content))
new_filespaces = GpDB.replaceFileSpaceContentID( fileSpaceDictionary = row.fileSpaceDictionary
, oldContent = row.content
, newContent = new_content
)
self.addExpansionSeg(new_content, ROLE_MIRROR, curr_dbid,
ROLE_MIRROR, hostname, address, int(row.port), new_datadir, row.prPort, fileSpaces = new_filespaces)
curr_dbid += 1
def guessIsMultiHome(self):
"""
Guess whether self is a multi-home (multiple interfaces per node) cluster
"""
segments = self.getSegDbList()
byHost = GpArray.getSegmentsByHostName(segments)
byAddress = GpArray.getSegmentsGroupedByValue(segments, GpDB.getSegmentAddress)
return len(byHost) != len(byAddress)
def guessIsSpreadMirror(self):
"""
Guess whether self is a spread mirroring configuration.
"""
if self.getFaultStrategy() != FAULT_STRATEGY_FILE_REPLICATION:
return False
mirrors = [seg for seg in self.getSegDbList() if seg.isSegmentMirror(current_role=False)]
primaries = [seg for seg in self.getSegDbList() if seg.isSegmentPrimary(current_role=False)]
assert len(mirrors) == len(primaries)
primaryHostNameToMirrorHostNameSet = {}
mirrorsByContentId = GpArray.getSegmentsByContentId(mirrors)
for primary in primaries:
mir = mirrorsByContentId[primary.getSegmentContentId()][0]
if primary.getSegmentHostName() not in primaryHostNameToMirrorHostNameSet:
primaryHostNameToMirrorHostNameSet[primary.getSegmentHostName()] = {}
primaryMap = primaryHostNameToMirrorHostNameSet[primary.getSegmentHostName()]
if mir.getSegmentHostName() not in primaryMap:
primaryMap[mir.getSegmentHostName()] = 0
primaryMap[mir.getSegmentHostName()] += 1
"""
This primary host has more than one segment on a single host: assume group mirroring!
"""
if primaryMap[mir.getSegmentHostName()] > 1:
return False
"""
Fall-through -- note that for a 2 host system with 1 segment per host, this will cause the guess to be 'spread'
"""
return True
@staticmethod
def getSegmentsGroupedByValue(segments, segmentMethodToGetValue):
result = {}
for segment in segments:
value = segmentMethodToGetValue(segment)
arr = result.get(value)
if arr is None:
result[value] = arr = []
arr.append(segment)
return result
@staticmethod
def getSegmentsByHostName(segments):
"""
Returns a map from segment host name to an array of segments (GpDB objects)
"""
return GpArray.getSegmentsGroupedByValue(segments, GpDB.getSegmentHostName)
@staticmethod
def getSegmentsByContentId(segments):
"""
Returns a map from segment contentId to an array of segments (GpDB objects)
"""
return GpArray.getSegmentsGroupedByValue(segments, GpDB.getSegmentContentId )
def getNumSegmentContents(self):
return len(GpArray.getSegmentsByContentId(self.getSegDbList()))
def getSegmentsAsLoadedFromDb(self):
"""
To be called by the configuration providers only
"""
return self.__segmentsAsLoadedFromDb
def setSegmentsAsLoadedFromDb(self, segments):
"""
To be called by the configuration providers only
"""
self.__segmentsAsLoadedFromDb = segments
def getStrategyAsLoadedFromDb(self):
"""
To be called by the configuration providers only
"""
return self.__strategyLoadedFromDb
def setStrategyAsLoadedFromDb(self, strategy):
"""
To be called by the configuration providers only
"""
self.__strategyLoadedFromDb = strategy
def get_segment_hosts(master_port):
"""
"""
gparray = GpArray.initFromCatalog( dbconn.DbURL(port=master_port), utility=True )
segments = GpArray.getSegmentsByHostName( gparray.getDbList() )
return segments.keys()
def get_session_ids(master_port):
"""
"""
conn = dbconn.connect( dbconn.DbURL(port=master_port), utility=True )
try:
rows = dbconn.execSQL(conn, "SELECT sess_id from pg_stat_activity where sess_id > 0;")
ids = set(row[0] for row in rows)
return ids
finally:
conn.close()
# === EOF ====