| #Licensed to the Apache Software Foundation (ASF) under one |
| #or more contributor license agreements. See the NOTICE file |
| #distributed with this work for additional information |
| #regarding copyright ownership. The ASF licenses this file |
| #to you under the Apache License, Version 2.0 (the |
| #"License"); you may not use this file except in compliance |
| #with the License. You may obtain a copy of the License at |
| |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| |
| #Unless required by applicable law or agreed to in writing, software |
| #distributed under the License is distributed on an "AS IS" BASIS, |
| #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| #See the License for the specific language governing permissions and |
| #limitations under the License. |
| """define Hdfs as subclass of Service""" |
| |
| # -*- python -*- |
| |
| import os |
| |
| from service import * |
| from hodlib.Hod.nodePool import * |
| from hodlib.Common.desc import CommandDesc |
| from hodlib.Common.util import get_exception_string, parseEquals |
| |
| class HdfsExternal(MasterSlave): |
| """dummy proxy to external HDFS instance""" |
| |
| def __init__(self, serviceDesc, workDirs, version): |
| MasterSlave.__init__(self, serviceDesc, workDirs,None) |
| self.launchedMaster = True |
| self.masterInitialized = True |
| self.version = version |
| |
| def getMasterRequest(self): |
| return None |
| |
| def getMasterCommands(self, serviceDict): |
| return [] |
| |
| def getAdminCommands(self, serviceDict): |
| return [] |
| |
| def getWorkerCommands(self, serviceDict): |
| return [] |
| |
| def getMasterAddrs(self): |
| attrs = self.serviceDesc.getfinalAttrs() |
| addr = attrs['fs.default.name'] |
| return [addr] |
| |
| def setMasterParams(self, dict): |
| self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \ |
| (dict['host'], dict['fs_port']) |
| |
| if self.version < 16: |
| self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \ |
| str(self.serviceDesc.dict['info_port']) |
| else: |
| # After Hadoop-2185 |
| self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \ |
| (dict['host'], dict['info_port']) |
| |
| def getInfoAddrs(self): |
| attrs = self.serviceDesc.getfinalAttrs() |
| if self.version < 16: |
| addr = attrs['fs.default.name'] |
| k,v = addr.split( ":") |
| infoaddr = k + ':' + attrs['dfs.info.port'] |
| else: |
| # After Hadoop-2185 |
| infoaddr = attrs['dfs.http.address'] |
| return [infoaddr] |
| |
| class Hdfs(MasterSlave): |
| |
| def __init__(self, serviceDesc, nodePool, required_node, version, \ |
| format=True, upgrade=False, |
| workers_per_ring = 1): |
| MasterSlave.__init__(self, serviceDesc, nodePool, required_node) |
| self.masterNode = None |
| self.masterAddr = None |
| self.runAdminCommands = True |
| self.infoAddr = None |
| self._isLost = False |
| self.format = format |
| self.upgrade = upgrade |
| self.workers = [] |
| self.version = version |
| self.workers_per_ring = workers_per_ring |
| |
| def getMasterRequest(self): |
| req = NodeRequest(1, [], False) |
| return req |
| |
| def getMasterCommands(self, serviceDict): |
| |
| masterCommands = [] |
| if self.format: |
| masterCommands.append(self._getNameNodeCommand(True)) |
| |
| if self.upgrade: |
| masterCommands.append(self._getNameNodeCommand(False, True)) |
| else: |
| masterCommands.append(self._getNameNodeCommand(False)) |
| |
| return masterCommands |
| |
| def getAdminCommands(self, serviceDict): |
| |
| adminCommands = [] |
| if self.upgrade and self.runAdminCommands: |
| adminCommands.append(self._getNameNodeAdminCommand('-safemode wait')) |
| adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade', |
| True, True)) |
| |
| self.runAdminCommands = False |
| return adminCommands |
| |
| def getWorkerCommands(self, serviceDict): |
| workerCmds = [] |
| for id in range(1, self.workers_per_ring + 1): |
| workerCmds.append(self._getDataNodeCommand(str(id))) |
| |
| return workerCmds |
| |
| def setMasterNodes(self, list): |
| node = list[0] |
| self.masterNode = node |
| |
| def getMasterAddrs(self): |
| return [self.masterAddr] |
| |
| def getInfoAddrs(self): |
| return [self.infoAddr] |
| |
| def getWorkers(self): |
| return self.workers |
| |
| def setMasterParams(self, list): |
| dict = self._parseEquals(list) |
| self.masterAddr = dict['fs.default.name'] |
| k,v = self.masterAddr.split( ":") |
| self.masterNode = k |
| if self.version < 16: |
| self.infoAddr = self.masterNode + ':' + dict['dfs.info.port'] |
| else: |
| # After Hadoop-2185 |
| self.infoAddr = dict['dfs.http.address'] |
| |
| def _parseEquals(self, list): |
| return parseEquals(list) |
| |
| def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir): |
| namedir = None |
| hadooptmpdir = None |
| datadir = [] |
| |
| for p in parentDirs: |
| workDirs.append(p) |
| workDirs.append(os.path.join(p, subDir)) |
| dir = os.path.join(p, subDir, 'dfs-data') |
| datadir.append(dir) |
| if not hadooptmpdir: |
| # Not used currently, generating hadooptmpdir just in case |
| hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp') |
| |
| if not namedir: |
| namedir = os.path.join(p, subDir, 'dfs-name') |
| |
| workDirs.append(namedir) |
| workDirs.extend(datadir) |
| |
| # FIXME!! use csv |
| attrs['dfs.name.dir'] = namedir |
| attrs['hadoop.tmp.dir'] = hadooptmpdir |
| attrs['dfs.data.dir'] = ','.join(datadir) |
| envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA" |
| |
| |
| def _getNameNodeCommand(self, format=False, upgrade=False): |
| sd = self.serviceDesc |
| |
| parentDirs = self.workDirs |
| workDirs = [] |
| attrs = sd.getfinalAttrs().copy() |
| envs = sd.getEnvs().copy() |
| |
| if 'fs.default.name' not in attrs: |
| attrs['fs.default.name'] = 'fillinhostport' |
| |
| if self.version < 16: |
| if 'dfs.info.port' not in attrs: |
| attrs['dfs.info.port'] = 'fillinport' |
| else: |
| # Addressing Hadoop-2185, added the following. Earlier versions don't |
| # care about this |
| if 'dfs.http.address' not in attrs: |
| attrs['dfs.http.address'] = 'fillinhostport' |
| |
| self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn') |
| |
| dict = { 'name' : 'namenode' } |
| dict['program'] = os.path.join('bin', 'hadoop') |
| argv = ['namenode'] |
| if format: |
| argv.append('-format') |
| elif upgrade: |
| argv.append('-upgrade') |
| dict['argv'] = argv |
| dict['envs'] = envs |
| dict['pkgdirs'] = sd.getPkgDirs() |
| dict['workdirs'] = workDirs |
| dict['final-attrs'] = attrs |
| dict['attrs'] = sd.getAttrs() |
| if format: |
| dict['fg'] = 'true' |
| dict['stdin'] = 'Y' |
| cmd = CommandDesc(dict) |
| return cmd |
| |
| def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False): |
| sd = self.serviceDesc |
| |
| parentDirs = self.workDirs |
| workDirs = [] |
| attrs = sd.getfinalAttrs().copy() |
| envs = sd.getEnvs().copy() |
| nn = self.masterAddr |
| |
| if nn == None: |
| raise ValueError, "Can't get namenode address" |
| |
| attrs['fs.default.name'] = nn |
| |
| self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn') |
| |
| dict = { 'name' : 'dfsadmin' } |
| dict['program'] = os.path.join('bin', 'hadoop') |
| argv = ['dfsadmin'] |
| argv.append(adminCommand) |
| dict['argv'] = argv |
| dict['envs'] = envs |
| dict['pkgdirs'] = sd.getPkgDirs() |
| dict['workdirs'] = workDirs |
| dict['final-attrs'] = attrs |
| dict['attrs'] = sd.getAttrs() |
| if wait: |
| dict['fg'] = 'true' |
| dict['stdin'] = 'Y' |
| if ignoreFailures: |
| dict['ignorefailures'] = 'Y' |
| cmd = CommandDesc(dict) |
| return cmd |
| |
| def _getDataNodeCommand(self, id): |
| |
| sd = self.serviceDesc |
| |
| parentDirs = self.workDirs |
| workDirs = [] |
| attrs = sd.getfinalAttrs().copy() |
| envs = sd.getEnvs().copy() |
| nn = self.masterAddr |
| |
| if nn == None: |
| raise ValueError, "Can't get namenode address" |
| |
| attrs['fs.default.name'] = nn |
| |
| if self.version < 16: |
| if 'dfs.datanode.port' not in attrs: |
| attrs['dfs.datanode.port'] = 'fillinport' |
| if 'dfs.datanode.info.port' not in attrs: |
| attrs['dfs.datanode.info.port'] = 'fillinport' |
| else: |
| # Adding the following. Hadoop-2185 |
| if 'dfs.datanode.address' not in attrs: |
| attrs['dfs.datanode.address'] = 'fillinhostport' |
| if 'dfs.datanode.http.address' not in attrs: |
| attrs['dfs.datanode.http.address'] = 'fillinhostport' |
| |
| if self.version >= 18: |
| # After HADOOP-3283 |
| # TODO: check for major as well as minor versions |
| attrs['dfs.datanode.ipc.address'] = 'fillinhostport' |
| |
| # unique workdirs in case of multiple datanodes per hodring |
| pd = [] |
| for dir in parentDirs: |
| dir = dir + "-" + id |
| pd.append(dir) |
| parentDirs = pd |
| # end of unique workdirs |
| |
| self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn') |
| |
| dict = { 'name' : 'datanode' } |
| dict['program'] = os.path.join('bin', 'hadoop') |
| dict['argv'] = ['datanode'] |
| dict['envs'] = envs |
| dict['pkgdirs'] = sd.getPkgDirs() |
| dict['workdirs'] = workDirs |
| dict['final-attrs'] = attrs |
| dict['attrs'] = sd.getAttrs() |
| |
| cmd = CommandDesc(dict) |
| return cmd |
| |