blob: 55a5b8e86eb9464c1fac5d0cf06d88e9c15319d0 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''
Greenplum hostcache file facilities.
This Module contains some helper functions for mapping network
interface names used in gp_configuration to a collapsed set
of hostnames.
example: sdw1-1, sdw1-2, sdw1-3, and sdw1-4 are all located
on sdw1.
The results of this collapsing will be stored in a file:
~/.gphostcache with entries of the form:
sdw1-1:sdw1
sdw1-2:sdw1
sdw1-3:sdw1
A big complication here is that we want to group all of the
segment databases for sdw1-1 thru sdw1-4 together but we can't
use the name returned by `hostname` as its not guaranteed to
have a trusted ssh environment setup for it.
'''
import os
from gppylib import gparray
from gppylib.commands import base
from gppylib.commands import unix
from gppylib import gplog
from gppylib.utils import readAllLinesFromFile
FILEDIR=os.path.expanduser("~")
FILENAME=".gphostcache"
CACHEFILE=FILEDIR + "/" + FILENAME
logger = gplog.get_default_logger()
class GpHost:
def __init__(self, hostname):
self.hostname = hostname
self.dbs=[]
def addDB(self,db):
self.dbs.append(db)
def __str__(self):
dirlist=[]
for db in self.dbs:
dirlist.append(db.datadir)
return "Host %s has Datadirs: [%s]" % (self.hostname,','.join(dirlist))
class GpInterfaceToHostNameCache:
def __init__(self, pool, interfacesToLookup, currentHostNameAnswersForInterfaces):
self.__hostCache={} # interface -> hostname
# Read the .gphostcache file if it exists
if os.path.isfile(CACHEFILE):
try:
for line in readAllLinesFromFile(CACHEFILE, stripLines=True, skipEmptyLines=True):
if line[0] == '#': # okay check because empty lines are skipped
continue
arr = line.split(':')
if len(arr) == 2 and len(arr[0]) > 0 and len(arr[1]) > 0:
(interface, hostname) = arr
self.__hostCache[interface.strip()]=hostname.strip()
except Exception, e:
logger.warn("Error reading file '%s': %s" % (CACHEFILE, str(e)))
#
# check to see which values are inconsistent with the cache and need lookup again
#
inconsistent = []
for i in range(len(interfacesToLookup)):
interface = interfacesToLookup[i]
hostname = currentHostNameAnswersForInterfaces[i]
# If we don't have this mapping yet set it, otherwise we simply
# validate consistency.
if interface not in self.__hostCache:
self.__hostCache[interface] = hostname
elif hostname is None:
# external source did not have a tentative answer, the first
# case above should have fired for the first entry on this
# interface and will force us to lookup the hostname.
# Additional hits on the interface can be ignored.
pass
elif self.__hostCache[interface] is None:
self.__hostCache[interface] = hostname
elif self.__hostCache[interface] != hostname:
logger.warn("inconsistent hostname '%s' for interface '%s' and expected hostname '%s'" % \
(self.__hostCache[interface], interface, hostname))
inconsistent.append(interface)
# Clear out any inconsistent hostnames to force a recheck.
for i in inconsistent:
self.__hostCache[i] = None
# Lookup any hostnames that we don't have answers for:
pending_cmds={}
for interface in self.__hostCache:
if self.__hostCache[interface] is None:
logger.debug("hostname lookup for %s" % interface)
cmd=unix.Hostname('host lookup', ctxt=base.REMOTE, remoteHost=interface)
pool.addCommand(cmd)
pending_cmds[interface] = cmd
# Fetch the results out of the WorkerPool
if len(pending_cmds) > 0:
pool.join()
for interface in pending_cmds:
cmd = pending_cmds[interface]
# Make sure the command completed successfully
if cmd.get_results().rc != 0:
logger.warn("Failed to resolve hostname for %s" % interface)
continue
self.__hostCache[interface] = cmd.get_hostname()
pool.empty_completed_items()
# Try to update the hostcache file if we executed any hostname commands
if len(pending_cmds) > 0:
try:
fp = open(CACHEFILE, 'w')
for interface in sorted(self.__hostCache.keys()):
hostname = self.__hostCache[interface]
# skip any dangling references we still have
if not hostname:
continue
fp.write("%s:%s\n" % (interface, hostname))
fp.close()
except Exception, e:
logger.warn(str(e))
logger.warn("Failed to write file '%s'" % CACHEFILE)
#
# returns the cached host name for the interface
#
# should only be called for interfaces that were passed to the constructor
#
# Will return None if lookup of the hostname was not possible
#
def getHostName(self, interface):
return self.__hostCache[interface]
class GpHostCache:
def __init__(self, gparray, pool, skiplist=[], withMasters=False):
self.gparray=gparray
self.gphost_map={} # hostname -> GpHost
# these are any db's that should be skipped.
skipmap={}
for db in skiplist:
skipmap[db.getSegmentDbId()]=db
# Go through the gparray and build list of interface and hostname that
# will be used to biuld the GpInterfaceToHostNameCache
#
# As Greeenplum 4.0 we have both interface and hostname information in
# the catalog, so the gparray should be able to supply all of the
# information.
#
# However if we have initialized from an old catalog, or from a flatfile
# then the interface->hostname mapping may not be available. In this
# case we still want to do the full pass first so that we don't lookup
# a given interface more than once.
interfaces = []
hostnames = []
# Get list of segment dbs, optionally including masters
if withMasters:
dblist = self.gparray.getDbList()
else:
dblist = self.gparray.getSegDbList()
# build the interface->host mapping
for db in dblist:
if db.getSegmentDbId() not in skipmap:
interfaces.append(db.getSegmentAddress())
hostnames.append(db.getSegmentHostName())
interfaceToHostMap = \
GpInterfaceToHostNameCache(pool, interfaces, hostnames)
# Build up the GpHosts using our interface->hostname lookup
for db in dblist:
# skip this dbid ?
if db.getSegmentDbId() in skipmap:
continue
interface = db.getSegmentAddress()
hostname = interfaceToHostMap.getHostName(interface)
# If the db didn't have hostname already set, (it was loaded from
# an old catalog?) set it based on the hostname from the interface
# lookup.
if db.getSegmentHostName() == None:
db.setSegmentHostName(hostname)
if hostname not in self.gphost_map:
self.gphost_map[hostname] = GpHost(hostname)
self.gphost_map[hostname].addDB(db)
######
def log_contents(self):
logger.debug("Construct host-->datadirs mapping:")
entries=[]
for key in self.gphost_map.keys():
gphost=self.gphost_map[key]
entries.append(gphost.__str__())
logger.debug('\n'.join(entries))
######
def get_hostnames(self):
hosts=[]
for key in self.gphost_map.keys():
gphost=self.gphost_map[key]
hosts.append(gphost.hostname)
return hosts
######
def get_hosts(self):
return self.gphost_map.values()
######
def get_host(self,hostname):
if hostname in self.gphost_map:
return self.gphost_map[hostname]
else:
raise Exception("map does not contain host: %s" % hostname)
#####
def ping_hosts(self, pool):
'''
go through all of the gphosts and try and ping all of the hosts.
If any fail then go back to using the interface names for those
segments.
throws an Exception if still can't ping on the interface names.
'''
failed_segs=[]
for key in self.gphost_map.keys():
p = unix.Ping('ping', key)
pool.addCommand(p)
pool.join()
cmds=pool.getCompletedItems()
for cmd in cmds:
# Look for commands that failed to ping
if not cmd.was_successful() != 0:
hostname=cmd.hostToPing
logger.warning("Ping to host: '%s' FAILED" % hostname)
logger.debug(" ping details: %s" % cmd)
gphost=self.get_host(hostname)
dblist=gphost.dbs
alternateHost=None
for db in dblist:
dbid = db.getSegmentDbId()
address = db.getSegmentAddress()
# It would be nice to handle these through a pool,
# but it is both a little difficult and not the
# expected case.
pingCmd = unix.Ping("dbid: %d" % dbid, address)
pingCmd.run()
if pingCmd.get_results().rc == 0:
alternateHost=address
logger.warning("alternate host: '%s' => '%s'" %
(hostname, address))
break
else:
logger.warning("Ping to host: '%s' FAILED" % hostname)
logger.debug(" ping details: %s" % pingCmd)
if alternateHost:
gphost.hostname=alternateHost
else:
# no interface to reach any of the segments, append all
# segments to the list of failed segments
failed_segs.extend(dblist)
# Removing the failed host from the cache.
#
# This seems a bit draconian, but that is what all callers
# of this function seem to want.
del self.gphost_map[hostname]
pool.empty_completed_items()
return failed_segs