blob: 740e820cdc03df942ac2f5decab5881c468c80c4 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import StringIO
import logging
import os
import shutil
import zipfile
import urllib2
logger = logging.getLogger()
class CachingException(Exception):
pass
class FileCache():
"""
Provides caching and lookup for service metadata files.
If service metadata is not available at cache,
downloads relevant files from the server.
"""
STACKS_CACHE_DIRECTORY="stacks"
CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"
HASH_SUM_FILE=".hash"
ARCHIVE_NAME="archive.zip"
BLOCK_SIZE=1024*16
SOCKET_TIMEOUT=10
def __init__(self, config):
self.service_component_pool = {}
self.config = config
self.cache_dir = config.get('agent', 'cache_dir')
# Defines whether command should fail when downloading scripts
# from the server is not possible or agent should rollback to local copy
self.tolerate_download_failures = \
config.get('agent','tolerate_download_failures').lower() == 'true'
self.reset()
def reset(self):
self.uptodate_paths = [] # Paths that already have been recently checked
def get_service_base_dir(self, command, server_url_prefix):
"""
Returns a base directory for service
"""
service_subpath = command['commandParams']['service_package_folder']
subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, service_subpath)
return self.provide_directory(self.cache_dir, subpath,
server_url_prefix)
def get_hook_base_dir(self, command, server_url_prefix):
"""
Returns a base directory for hooks
"""
try:
hooks_subpath = command['commandParams']['hooks_folder']
except KeyError:
return None
subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
return self.provide_directory(self.cache_dir, subpath,
server_url_prefix)
def get_custom_actions_base_dir(self, server_url_prefix):
"""
Returns a base directory for custom action scripts
"""
return self.provide_directory(self.cache_dir,
self.CUSTOM_ACTIONS_CACHE_DIRECTORY,
server_url_prefix)
def provide_directory(self, cache_path, subdirectory, server_url_prefix):
"""
Ensures that directory at cache is up-to-date. Throws a CachingException
if any problems occur
Parameters;
cache_path: full path to cache directory
subdirectory: subpath inside cache
server_url_prefix: url of "resources" folder at the server
"""
full_path = os.path.join(cache_path, subdirectory)
logger.debug("Trying to provide directory {0}".format(subdirectory))
try:
if full_path not in self.uptodate_paths:
logger.debug("Checking if update is available for "
"directory {0}".format(full_path))
# Need to check for updates at server
remote_url = self.build_download_url(server_url_prefix,
subdirectory, self.HASH_SUM_FILE)
memory_buffer = self.fetch_url(remote_url)
remote_hash = memory_buffer.getvalue().strip()
local_hash = self.read_hash_sum(full_path)
if not local_hash or local_hash != remote_hash:
logger.debug("Updating directory {0}".format(full_path))
download_url = self.build_download_url(server_url_prefix,
subdirectory, self.ARCHIVE_NAME)
membuffer = self.fetch_url(download_url)
self.invalidate_directory(full_path)
self.unpack_archive(membuffer, full_path)
self.write_hash_sum(full_path, remote_hash)
# Finally consider cache directory up-to-date
self.uptodate_paths.append(full_path)
except CachingException, e:
if self.tolerate_download_failures:
# ignore
logger.warn("Error occured during cache update. "
"Error tolerate setting is set to true, so"
" ignoring this error and continuing with current cache. "
"Error details: {0}".format(str(e)))
else:
raise # we are not tolerant to exceptions, command execution will fail
return full_path
def build_download_url(self, server_url_prefix,
directory, filename):
"""
Builds up a proper download url for file. Used for downloading files
from the server.
directory - relative path
filename - file inside directory we are trying to fetch
"""
return "{0}/{1}/{2}".format(server_url_prefix,
directory, filename)
def fetch_url(self, url):
"""
Fetches content on url to in-memory buffer and returns the resulting buffer.
May throw exceptions because of various reasons
"""
logger.debug("Trying to download {0}".format(url))
try:
memory_buffer = StringIO.StringIO()
u = urllib2.urlopen(url, timeout=self.SOCKET_TIMEOUT)
logger.debug("Connected with {0} with code {1}".format(u.geturl(),
u.getcode()))
buff = u.read(self.BLOCK_SIZE)
while buff:
memory_buffer.write(buff)
buff = u.read(self.BLOCK_SIZE)
if not buff:
break
return memory_buffer
except Exception, err:
raise CachingException("Can not download file from"
" url {0} : {1}".format(url, str(err)))
def read_hash_sum(self, directory):
"""
Tries to read a hash sum from previously generated file. Returns string
containing hash or None
"""
hash_file = os.path.join(directory, self.HASH_SUM_FILE)
try:
with open(hash_file) as fh:
return fh.readline().strip()
except:
return None # We don't care
def write_hash_sum(self, directory, new_hash):
"""
Tries to read a hash sum from previously generated file. Returns string
containing hash or None
"""
hash_file = os.path.join(directory, self.HASH_SUM_FILE)
try:
with open(hash_file, "w") as fh:
fh.write(new_hash)
except Exception, err:
raise CachingException("Can not write to file {0} : {1}".format(hash_file,
str(err)))
def invalidate_directory(self, directory):
"""
Recursively removes directory content (if any). Also, creates
directory and any parent directories if needed. May throw exceptions
on permission problems
"""
logger.debug("Invalidating directory {0}".format(directory))
try:
if os.path.isfile(directory): # It would be a strange situation
os.unlink(directory)
elif os.path.isdir(directory):
shutil.rmtree(directory)
# create directory itself and any parent directories
os.makedirs(directory)
except Exception, err:
raise CachingException("Can not invalidate cache directory {0}: {1}",
directory, str(err))
def unpack_archive(self, mem_buffer, target_directory):
"""
Unpacks contents of in-memory buffer to file system.
In-memory buffer is expected to contain a valid zip archive
"""
try:
zfile = zipfile.ZipFile(mem_buffer)
for name in zfile.namelist():
(dirname, filename) = os.path.split(name)
concrete_dir=os.path.abspath(os.path.join(target_directory, dirname))
if not os.path.isdir(concrete_dir):
os.makedirs(concrete_dir)
logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir))
if filename!='':
zfile.extract(name, target_directory)
except Exception, err:
raise CachingException("Can not unpack zip file to "
"directory {0} : {1}".format(
target_directory, str(err)))