| #!/usr/bin/env python |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| import StringIO |
| |
| import logging |
| import os |
| import shutil |
| import zipfile |
| import urllib2 |
| |
| logger = logging.getLogger() |
| |
| class CachingException(Exception): |
| pass |
| |
| class FileCache(): |
| """ |
| Provides caching and lookup for service metadata files. |
| If service metadata is not available at cache, |
| downloads relevant files from the server. |
| """ |
| |
| STACKS_CACHE_DIRECTORY="stacks" |
| CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions" |
| HASH_SUM_FILE=".hash" |
| ARCHIVE_NAME="archive.zip" |
| |
| BLOCK_SIZE=1024*16 |
| SOCKET_TIMEOUT=10 |
| |
| def __init__(self, config): |
| self.service_component_pool = {} |
| self.config = config |
| self.cache_dir = config.get('agent', 'cache_dir') |
| # Defines whether command should fail when downloading scripts |
| # from the server is not possible or agent should rollback to local copy |
| self.tolerate_download_failures = \ |
| config.get('agent','tolerate_download_failures').lower() == 'true' |
| self.reset() |
| |
| |
| def reset(self): |
| self.uptodate_paths = [] # Paths that already have been recently checked |
| |
| |
| def get_service_base_dir(self, command, server_url_prefix): |
| """ |
| Returns a base directory for service |
| """ |
| service_subpath = command['commandParams']['service_package_folder'] |
| subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, service_subpath) |
| return self.provide_directory(self.cache_dir, subpath, |
| server_url_prefix) |
| |
| |
| def get_hook_base_dir(self, command, server_url_prefix): |
| """ |
| Returns a base directory for hooks |
| """ |
| try: |
| hooks_subpath = command['commandParams']['hooks_folder'] |
| except KeyError: |
| return None |
| subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath) |
| return self.provide_directory(self.cache_dir, subpath, |
| server_url_prefix) |
| |
| |
| def get_custom_actions_base_dir(self, server_url_prefix): |
| """ |
| Returns a base directory for custom action scripts |
| """ |
| return self.provide_directory(self.cache_dir, |
| self.CUSTOM_ACTIONS_CACHE_DIRECTORY, |
| server_url_prefix) |
| |
| |
| def provide_directory(self, cache_path, subdirectory, server_url_prefix): |
| """ |
| Ensures that directory at cache is up-to-date. Throws a CachingException |
| if any problems occur |
| Parameters; |
| cache_path: full path to cache directory |
| subdirectory: subpath inside cache |
| server_url_prefix: url of "resources" folder at the server |
| """ |
| full_path = os.path.join(cache_path, subdirectory) |
| logger.debug("Trying to provide directory {0}".format(subdirectory)) |
| try: |
| if full_path not in self.uptodate_paths: |
| logger.debug("Checking if update is available for " |
| "directory {0}".format(full_path)) |
| # Need to check for updates at server |
| remote_url = self.build_download_url(server_url_prefix, |
| subdirectory, self.HASH_SUM_FILE) |
| memory_buffer = self.fetch_url(remote_url) |
| remote_hash = memory_buffer.getvalue().strip() |
| local_hash = self.read_hash_sum(full_path) |
| if not local_hash or local_hash != remote_hash: |
| logger.debug("Updating directory {0}".format(full_path)) |
| download_url = self.build_download_url(server_url_prefix, |
| subdirectory, self.ARCHIVE_NAME) |
| membuffer = self.fetch_url(download_url) |
| self.invalidate_directory(full_path) |
| self.unpack_archive(membuffer, full_path) |
| self.write_hash_sum(full_path, remote_hash) |
| # Finally consider cache directory up-to-date |
| self.uptodate_paths.append(full_path) |
| except CachingException, e: |
| if self.tolerate_download_failures: |
| # ignore |
| logger.warn("Error occured during cache update. " |
| "Error tolerate setting is set to true, so" |
| " ignoring this error and continuing with current cache. " |
| "Error details: {0}".format(str(e))) |
| else: |
| raise # we are not tolerant to exceptions, command execution will fail |
| return full_path |
| |
| |
| def build_download_url(self, server_url_prefix, |
| directory, filename): |
| """ |
| Builds up a proper download url for file. Used for downloading files |
| from the server. |
| directory - relative path |
| filename - file inside directory we are trying to fetch |
| """ |
| return "{0}/{1}/{2}".format(server_url_prefix, |
| directory, filename) |
| |
| |
| def fetch_url(self, url): |
| """ |
| Fetches content on url to in-memory buffer and returns the resulting buffer. |
| May throw exceptions because of various reasons |
| """ |
| logger.debug("Trying to download {0}".format(url)) |
| try: |
| memory_buffer = StringIO.StringIO() |
| u = urllib2.urlopen(url, timeout=self.SOCKET_TIMEOUT) |
| logger.debug("Connected with {0} with code {1}".format(u.geturl(), |
| u.getcode())) |
| buff = u.read(self.BLOCK_SIZE) |
| while buff: |
| memory_buffer.write(buff) |
| buff = u.read(self.BLOCK_SIZE) |
| if not buff: |
| break |
| return memory_buffer |
| except Exception, err: |
| raise CachingException("Can not download file from" |
| " url {0} : {1}".format(url, str(err))) |
| |
| |
| def read_hash_sum(self, directory): |
| """ |
| Tries to read a hash sum from previously generated file. Returns string |
| containing hash or None |
| """ |
| hash_file = os.path.join(directory, self.HASH_SUM_FILE) |
| try: |
| with open(hash_file) as fh: |
| return fh.readline().strip() |
| except: |
| return None # We don't care |
| |
| |
| def write_hash_sum(self, directory, new_hash): |
| """ |
| Tries to read a hash sum from previously generated file. Returns string |
| containing hash or None |
| """ |
| hash_file = os.path.join(directory, self.HASH_SUM_FILE) |
| try: |
| with open(hash_file, "w") as fh: |
| fh.write(new_hash) |
| except Exception, err: |
| raise CachingException("Can not write to file {0} : {1}".format(hash_file, |
| str(err))) |
| |
| |
| def invalidate_directory(self, directory): |
| """ |
| Recursively removes directory content (if any). Also, creates |
| directory and any parent directories if needed. May throw exceptions |
| on permission problems |
| """ |
| logger.debug("Invalidating directory {0}".format(directory)) |
| try: |
| if os.path.isfile(directory): # It would be a strange situation |
| os.unlink(directory) |
| elif os.path.isdir(directory): |
| shutil.rmtree(directory) |
| # create directory itself and any parent directories |
| os.makedirs(directory) |
| except Exception, err: |
| raise CachingException("Can not invalidate cache directory {0}: {1}", |
| directory, str(err)) |
| |
| |
| def unpack_archive(self, mem_buffer, target_directory): |
| """ |
| Unpacks contents of in-memory buffer to file system. |
| In-memory buffer is expected to contain a valid zip archive |
| """ |
| try: |
| zfile = zipfile.ZipFile(mem_buffer) |
| for name in zfile.namelist(): |
| (dirname, filename) = os.path.split(name) |
| concrete_dir=os.path.abspath(os.path.join(target_directory, dirname)) |
| if not os.path.isdir(concrete_dir): |
| os.makedirs(concrete_dir) |
| logger.debug("Unpacking file {0} to {1}".format(name, concrete_dir)) |
| if filename!='': |
| zfile.extract(name, target_directory) |
| except Exception, err: |
| raise CachingException("Can not unpack zip file to " |
| "directory {0} : {1}".format( |
| target_directory, str(err))) |