blob: 6f569673ad7a9e337da8b44883469ec1db7237ae [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Slider Agent
"""
from __future__ import with_statement
import platform
import os
import time
import shutil
from resource_management.core import shell
from resource_management.core.base import Fail
from resource_management.core import ExecuteTimeoutException
from resource_management.core.providers import Provider
from resource_management.core.logger import Logger
IS_WINDOWS = platform.system() == "Windows"
if not IS_WINDOWS:
import grp
import pwd
def _coerce_uid(user):
try:
uid = int(user)
except ValueError:
if not IS_WINDOWS:
try:
uid = pwd.getpwnam(user).pw_uid
except KeyError:
raise Fail("User %s doesn't exist." % user)
else:
raise Fail("User %s doesn't exist." % user)
return uid
def _coerce_gid(group):
try:
gid = int(group)
except ValueError:
if not IS_WINDOWS:
try:
gid = grp.getgrnam(group).gr_gid
except KeyError:
raise Fail("Group %s doesn't exist." % group)
else:
raise Fail("User %s doesn't exist." % user)
return gid
def _ensure_metadata(path, user, group, mode=None):
stat = os.stat(path)
if mode:
existing_mode = stat.st_mode & 07777
if existing_mode != mode:
Logger.info("Changing permission for %s from %o to %o" % (
path, existing_mode, mode))
os.chmod(path, mode)
## Slider apps should have no need to chown the uid or gid
## Keeping the code around as a reference
## if user:
## uid = _coerce_uid(user)
## if stat.st_uid != uid:
## Logger.info(
## "Changing owner for %s from %d to %s" % (path, stat.st_uid, user))
## os.chown(path, uid, -1)
## if group:
## gid = _coerce_gid(group)
## if stat.st_gid != gid:
## Logger.info(
## "Changing group for %s from %d to %s" % (path, stat.st_gid, group))
## os.chown(path, -1, gid)
class FileProvider(Provider):
def action_create(self):
path = self.resource.path
if os.path.isdir(path):
raise Fail("Applying %s failed, directory with name %s exists" % (self.resource, path))
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
write = False
content = self._get_content()
if not os.path.exists(path):
write = True
reason = "it doesn't exist"
elif self.resource.replace:
if content is not None:
with open(path, "rb") as fp:
old_content = fp.read()
if content != old_content:
write = True
reason = "contents don't match"
if self.resource.backup:
self.resource.env.backup_file(path)
if write:
Logger.info("Writing %s because %s" % (self.resource, reason))
with open(path, "wb") as fp:
if content:
fp.write(content)
_ensure_metadata(self.resource.path, self.resource.owner,
self.resource.group, mode=self.resource.mode)
def action_delete(self):
path = self.resource.path
if os.path.isdir(path):
raise Fail("Applying %s failed, %s is directory not file!" % (self.resource, path))
if os.path.exists(path):
Logger.info("Deleting %s" % self.resource)
os.unlink(path)
def _get_content(self):
content = self.resource.content
if content is None:
return None
elif isinstance(content, basestring):
return content
elif hasattr(content, "__call__"):
return content()
raise Fail("Unknown source type for %s: %r" % (self, content))
class DirectoryProvider(Provider):
def action_create(self):
path = self.resource.path
if not os.path.exists(path):
Logger.info("Creating directory %s" % self.resource)
if self.resource.recursive:
os.makedirs(path, self.resource.mode or 0755)
else:
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
raise Fail("Applying %s failed, parent directory %s doesn't exist" % (self.resource, dirname))
os.mkdir(path, self.resource.mode or 0755)
if not os.path.isdir(path):
raise Fail("Applying %s failed, file %s already exists" % (self.resource, path))
_ensure_metadata(path, self.resource.owner, self.resource.group,
mode=self.resource.mode)
if self.resource.content and os.path.isdir(self.resource.content):
Logger.info("Copying from " + self.resource.content + " to " + path)
for item in os.listdir(self.resource.content):
src = os.path.join(self.resource.content, item)
dst = os.path.join(path, item)
if not os.path.isdir(src):
Logger.info("Copying " + src + " as " + dst)
shutil.copy2(src, dst)
def action_delete(self):
path = self.resource.path
if os.path.exists(path):
if not os.path.isdir(path):
raise Fail("Applying %s failed, %s is not a directory" % (self.resource, path))
Logger.info("Removing directory %s and all its content" % self.resource)
shutil.rmtree(path)
class LinkProvider(Provider):
def action_create(self):
path = self.resource.path
if os.path.lexists(path):
oldpath = os.path.realpath(path)
if oldpath == self.resource.to:
return
if not os.path.islink(path):
raise Fail(
"%s trying to create a symlink with the same name as an existing file or directory" % self)
Logger.info("%s replacing old symlink to %s" % (self.resource, oldpath))
os.unlink(path)
if self.resource.hard:
if not os.path.exists(self.resource.to):
raise Fail("Failed to apply %s, linking to nonexistent location %s" % (self.resource, self.resource.to))
if os.path.isdir(self.resource.to):
raise Fail("Failed to apply %s, cannot create hard link to a directory (%s)" % (self.resource, self.resource.to))
Logger.info("Creating hard %s" % self.resource)
os.link(self.resource.to, path)
else:
if not os.path.exists(self.resource.to):
Logger.info("Warning: linking to nonexistent location %s" % self.resource.to)
Logger.info("Creating symbolic %s" % self.resource)
os.symlink(self.resource.to, path)
def action_delete(self):
path = self.resource.path
if os.path.exists(path):
Logger.info("Deleting %s" % self.resource)
os.unlink(path)
def _preexec_fn(resource):
def preexec():
if resource.group:
gid = _coerce_gid(resource.group)
os.setgid(gid)
os.setegid(gid)
return preexec
class ExecuteProvider(Provider):
def action_run(self):
if self.resource.creates:
if os.path.exists(self.resource.creates):
return
Logger.debug("Executing %s" % self.resource)
if self.resource.path != []:
if not self.resource.environment:
self.resource.environment = {}
self.resource.environment['PATH'] = os.pathsep.join(self.resource.path)
for i in range (0, self.resource.tries):
try:
shell.checked_call(self.resource.command, logoutput=self.resource.logoutput,
cwd=self.resource.cwd, env=self.resource.environment,
preexec_fn=_preexec_fn(self.resource), user=self.resource.user,
wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout,
pid_file=self.resource.pid_file, poll_after=self.resource.poll_after)
break
except Fail as ex:
if i == self.resource.tries-1: # last try
raise ex
else:
Logger.info("Retrying after %d seconds. Reason: %s" % (self.resource.try_sleep, str(ex)))
time.sleep(self.resource.try_sleep)
except ExecuteTimeoutException:
err_msg = ("Execution of '%s' was killed due timeout after %d seconds") % (self.resource.command, self.resource.timeout)
if self.resource.on_timeout:
Logger.info("Executing '%s'. Reason: %s" % (self.resource.on_timeout, err_msg))
shell.checked_call(self.resource.on_timeout)
else:
raise Fail(err_msg)
class ExecuteScriptProvider(Provider):
def action_run(self):
from tempfile import NamedTemporaryFile
Logger.info("Running script %s" % self.resource)
with NamedTemporaryFile(prefix="resource_management-script", bufsize=0) as tf:
tf.write(self.resource.code)
tf.flush()
_ensure_metadata(tf.name, self.resource.user, self.resource.group)
shell.call([self.resource.interpreter, tf.name],
cwd=self.resource.cwd, env=self.resource.environment,
preexec_fn=_preexec_fn(self.resource))