blob: 11efd116c3856bc001277339f31055bc2ae9de38 [file] [log] [blame]
#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements. See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership. The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
"""define Hdfs as subclass of Service"""
# -*- python -*-
import os
from service import *
from hodlib.Hod.nodePool import *
from hodlib.Common.desc import CommandDesc
from hodlib.Common.util import get_exception_string, parseEquals
class HdfsExternal(MasterSlave):
"""dummy proxy to external HDFS instance"""
def __init__(self, serviceDesc, workDirs, version):
MasterSlave.__init__(self, serviceDesc, workDirs,None)
self.launchedMaster = True
self.masterInitialized = True
self.version = version
def getMasterRequest(self):
return None
def getMasterCommands(self, serviceDict):
return []
def getAdminCommands(self, serviceDict):
return []
def getWorkerCommands(self, serviceDict):
return []
def getMasterAddrs(self):
attrs = self.serviceDesc.getfinalAttrs()
addr = attrs['fs.default.name']
return [addr]
def setMasterParams(self, dict):
self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
(dict['host'], dict['fs_port'])
if self.version < 16:
self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
str(self.serviceDesc.dict['info_port'])
else:
# After Hadoop-2185
self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \
(dict['host'], dict['info_port'])
def getInfoAddrs(self):
attrs = self.serviceDesc.getfinalAttrs()
if self.version < 16:
addr = attrs['fs.default.name']
k,v = addr.split( ":")
infoaddr = k + ':' + attrs['dfs.info.port']
else:
# After Hadoop-2185
infoaddr = attrs['dfs.http.address']
return [infoaddr]
class Hdfs(MasterSlave):
def __init__(self, serviceDesc, nodePool, required_node, version, \
format=True, upgrade=False,
workers_per_ring = 1):
MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
self.masterNode = None
self.masterAddr = None
self.runAdminCommands = True
self.infoAddr = None
self._isLost = False
self.format = format
self.upgrade = upgrade
self.workers = []
self.version = version
self.workers_per_ring = workers_per_ring
def getMasterRequest(self):
req = NodeRequest(1, [], False)
return req
def getMasterCommands(self, serviceDict):
masterCommands = []
if self.format:
masterCommands.append(self._getNameNodeCommand(True))
if self.upgrade:
masterCommands.append(self._getNameNodeCommand(False, True))
else:
masterCommands.append(self._getNameNodeCommand(False))
return masterCommands
def getAdminCommands(self, serviceDict):
adminCommands = []
if self.upgrade and self.runAdminCommands:
adminCommands.append(self._getNameNodeAdminCommand('-safemode wait'))
adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade',
True, True))
self.runAdminCommands = False
return adminCommands
def getWorkerCommands(self, serviceDict):
workerCmds = []
for id in range(1, self.workers_per_ring + 1):
workerCmds.append(self._getDataNodeCommand(str(id)))
return workerCmds
def setMasterNodes(self, list):
node = list[0]
self.masterNode = node
def getMasterAddrs(self):
return [self.masterAddr]
def getInfoAddrs(self):
return [self.infoAddr]
def getWorkers(self):
return self.workers
def setMasterParams(self, list):
dict = self._parseEquals(list)
self.masterAddr = dict['fs.default.name']
k,v = self.masterAddr.split( ":")
self.masterNode = k
if self.version < 16:
self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
else:
# After Hadoop-2185
self.infoAddr = dict['dfs.http.address']
def _parseEquals(self, list):
return parseEquals(list)
def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
namedir = None
hadooptmpdir = None
datadir = []
for p in parentDirs:
workDirs.append(p)
workDirs.append(os.path.join(p, subDir))
dir = os.path.join(p, subDir, 'dfs-data')
datadir.append(dir)
if not hadooptmpdir:
# Not used currently, generating hadooptmpdir just in case
hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
if not namedir:
namedir = os.path.join(p, subDir, 'dfs-name')
workDirs.append(namedir)
workDirs.extend(datadir)
# FIXME!! use csv
attrs['dfs.name.dir'] = namedir
attrs['hadoop.tmp.dir'] = hadooptmpdir
attrs['dfs.data.dir'] = ','.join(datadir)
envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
def _getNameNodeCommand(self, format=False, upgrade=False):
sd = self.serviceDesc
parentDirs = self.workDirs
workDirs = []
attrs = sd.getfinalAttrs().copy()
envs = sd.getEnvs().copy()
if 'fs.default.name' not in attrs:
attrs['fs.default.name'] = 'fillinhostport'
if self.version < 16:
if 'dfs.info.port' not in attrs:
attrs['dfs.info.port'] = 'fillinport'
else:
# Addressing Hadoop-2185, added the following. Earlier versions don't
# care about this
if 'dfs.http.address' not in attrs:
attrs['dfs.http.address'] = 'fillinhostport'
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
dict = { 'name' : 'namenode' }
dict['program'] = os.path.join('bin', 'hadoop')
argv = ['namenode']
if format:
argv.append('-format')
elif upgrade:
argv.append('-upgrade')
dict['argv'] = argv
dict['envs'] = envs
dict['pkgdirs'] = sd.getPkgDirs()
dict['workdirs'] = workDirs
dict['final-attrs'] = attrs
dict['attrs'] = sd.getAttrs()
if format:
dict['fg'] = 'true'
dict['stdin'] = 'Y'
cmd = CommandDesc(dict)
return cmd
def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False):
sd = self.serviceDesc
parentDirs = self.workDirs
workDirs = []
attrs = sd.getfinalAttrs().copy()
envs = sd.getEnvs().copy()
nn = self.masterAddr
if nn == None:
raise ValueError, "Can't get namenode address"
attrs['fs.default.name'] = nn
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
dict = { 'name' : 'dfsadmin' }
dict['program'] = os.path.join('bin', 'hadoop')
argv = ['dfsadmin']
argv.append(adminCommand)
dict['argv'] = argv
dict['envs'] = envs
dict['pkgdirs'] = sd.getPkgDirs()
dict['workdirs'] = workDirs
dict['final-attrs'] = attrs
dict['attrs'] = sd.getAttrs()
if wait:
dict['fg'] = 'true'
dict['stdin'] = 'Y'
if ignoreFailures:
dict['ignorefailures'] = 'Y'
cmd = CommandDesc(dict)
return cmd
def _getDataNodeCommand(self, id):
sd = self.serviceDesc
parentDirs = self.workDirs
workDirs = []
attrs = sd.getfinalAttrs().copy()
envs = sd.getEnvs().copy()
nn = self.masterAddr
if nn == None:
raise ValueError, "Can't get namenode address"
attrs['fs.default.name'] = nn
if self.version < 16:
if 'dfs.datanode.port' not in attrs:
attrs['dfs.datanode.port'] = 'fillinport'
if 'dfs.datanode.info.port' not in attrs:
attrs['dfs.datanode.info.port'] = 'fillinport'
else:
# Adding the following. Hadoop-2185
if 'dfs.datanode.address' not in attrs:
attrs['dfs.datanode.address'] = 'fillinhostport'
if 'dfs.datanode.http.address' not in attrs:
attrs['dfs.datanode.http.address'] = 'fillinhostport'
if self.version >= 18:
# After HADOOP-3283
# TODO: check for major as well as minor versions
attrs['dfs.datanode.ipc.address'] = 'fillinhostport'
# unique workdirs in case of multiple datanodes per hodring
pd = []
for dir in parentDirs:
dir = dir + "-" + id
pd.append(dir)
parentDirs = pd
# end of unique workdirs
self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
dict = { 'name' : 'datanode' }
dict['program'] = os.path.join('bin', 'hadoop')
dict['argv'] = ['datanode']
dict['envs'] = envs
dict['pkgdirs'] = sd.getPkgDirs()
dict['workdirs'] = workDirs
dict['final-attrs'] = attrs
dict['attrs'] = sd.getAttrs()
cmd = CommandDesc(dict)
return cmd