| #!/usr/bin/env python |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| import ConfigParser |
| import os |
| |
| import pprint |
| |
| from unittest import TestCase |
| import threading |
| import tempfile |
| import time |
| from threading import Thread |
| |
| from FileCache import FileCache, CachingException |
| from AmbariConfig import AmbariConfig |
| from mock.mock import MagicMock, patch |
| import StringIO |
| import sys |
| import shutil |
| |
| |
| class TestFileCache(TestCase): |
| |
| def setUp(self): |
| # disable stdout |
| out = StringIO.StringIO() |
| sys.stdout = out |
| # generate sample config |
| tmpdir = tempfile.gettempdir() |
| self.config = ConfigParser.RawConfigParser() |
| self.config.add_section('agent') |
| self.config.set('agent', 'prefix', tmpdir) |
| self.config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") |
| self.config.set('agent', 'tolerate_download_failures', "true") |
| |
| |
| def test_reset(self): |
| fileCache = FileCache(self.config) |
| fileCache.uptodate_paths.append('dummy-path') |
| fileCache.reset() |
| self.assertFalse(fileCache.uptodate_paths) |
| |
| |
| @patch.object(FileCache, "provide_directory") |
| def test_get_service_base_dir(self, provide_directory_mock): |
| provide_directory_mock.return_value = "dummy value" |
| fileCache = FileCache(self.config) |
| command = { |
| 'commandParams' : { |
| 'service_package_folder' : 'HDP/2.1.1/services/ZOOKEEPER/package' |
| } |
| } |
| res = fileCache.get_service_base_dir(command, "server_url_pref") |
| self.assertEquals( |
| pprint.pformat(provide_directory_mock.call_args_list[0][0]), |
| "('/var/lib/ambari-agent/cache',\n " |
| "'stacks/HDP/2.1.1/services/ZOOKEEPER/package',\n" |
| " 'server_url_pref')") |
| self.assertEquals(res, "dummy value") |
| |
| |
| @patch.object(FileCache, "provide_directory") |
| def test_get_hook_base_dir(self, provide_directory_mock): |
| fileCache = FileCache(self.config) |
| # Check missing parameter |
| command = { |
| 'commandParams' : { |
| } |
| } |
| base = fileCache.get_hook_base_dir(command, "server_url_pref") |
| self.assertEqual(base, None) |
| self.assertFalse(provide_directory_mock.called) |
| |
| # Check existing dir case |
| command = { |
| 'commandParams' : { |
| 'hooks_folder' : 'HDP/2.1.1/hooks' |
| } |
| } |
| provide_directory_mock.return_value = "dummy value" |
| fileCache = FileCache(self.config) |
| res = fileCache.get_hook_base_dir(command, "server_url_pref") |
| self.assertEquals( |
| pprint.pformat(provide_directory_mock.call_args_list[0][0]), |
| "('/var/lib/ambari-agent/cache', " |
| "'stacks/HDP/2.1.1/hooks', " |
| "'server_url_pref')") |
| self.assertEquals(res, "dummy value") |
| |
| |
| @patch.object(FileCache, "provide_directory") |
| def test_get_custom_actions_base_dir(self, provide_directory_mock): |
| provide_directory_mock.return_value = "dummy value" |
| fileCache = FileCache(self.config) |
| res = fileCache.get_custom_actions_base_dir("server_url_pref") |
| self.assertEquals( |
| pprint.pformat(provide_directory_mock.call_args_list[0][0]), |
| "('/var/lib/ambari-agent/cache', 'custom_actions', 'server_url_pref')") |
| self.assertEquals(res, "dummy value") |
| |
| |
| @patch.object(FileCache, "build_download_url") |
| @patch.object(FileCache, "fetch_url") |
| @patch.object(FileCache, "read_hash_sum") |
| @patch.object(FileCache, "invalidate_directory") |
| @patch.object(FileCache, "unpack_archive") |
| @patch.object(FileCache, "write_hash_sum") |
| def test_provide_directory(self, write_hash_sum_mock, unpack_archive_mock, |
| invalidate_directory_mock, |
| read_hash_sum_mock, fetch_url_mock, |
| build_download_url_mock): |
| build_download_url_mock.return_value = "http://dummy-url/" |
| HASH1 = "hash1" |
| membuffer = MagicMock() |
| membuffer.getvalue.return_value.strip.return_value = HASH1 |
| fileCache = FileCache(self.config) |
| |
| # Test uptodate dirs after start |
| self.assertFalse(fileCache.uptodate_paths) |
| |
| # Test initial downloading (when dir does not exist) |
| fetch_url_mock.return_value = membuffer |
| read_hash_sum_mock.return_value = "hash2" |
| res = fileCache.provide_directory("cache_path", "subdirectory", |
| "server_url_prefix") |
| self.assertTrue(invalidate_directory_mock.called) |
| self.assertTrue(write_hash_sum_mock.called) |
| self.assertEquals(fetch_url_mock.call_count, 2) |
| self.assertEquals(pprint.pformat(fileCache.uptodate_paths), |
| "['cache_path/subdirectory']") |
| self.assertEquals(res, 'cache_path/subdirectory') |
| |
| fetch_url_mock.reset_mock() |
| write_hash_sum_mock.reset_mock() |
| invalidate_directory_mock.reset_mock() |
| unpack_archive_mock.reset_mock() |
| |
| # Test cache invalidation when local hash does not differ |
| fetch_url_mock.return_value = membuffer |
| read_hash_sum_mock.return_value = HASH1 |
| fileCache.reset() |
| |
| res = fileCache.provide_directory("cache_path", "subdirectory", |
| "server_url_prefix") |
| self.assertFalse(invalidate_directory_mock.called) |
| self.assertFalse(write_hash_sum_mock.called) |
| self.assertEquals(fetch_url_mock.call_count, 1) |
| self.assertEquals(pprint.pformat(fileCache.uptodate_paths), |
| "['cache_path/subdirectory']") |
| self.assertEquals(res, 'cache_path/subdirectory') |
| |
| fetch_url_mock.reset_mock() |
| write_hash_sum_mock.reset_mock() |
| invalidate_directory_mock.reset_mock() |
| unpack_archive_mock.reset_mock() |
| |
| # Test execution path when path is up-to date (already checked) |
| res = fileCache.provide_directory("cache_path", "subdirectory", |
| "server_url_prefix") |
| self.assertFalse(invalidate_directory_mock.called) |
| self.assertFalse(write_hash_sum_mock.called) |
| self.assertEquals(fetch_url_mock.call_count, 0) |
| self.assertEquals(pprint.pformat(fileCache.uptodate_paths), |
| "['cache_path/subdirectory']") |
| self.assertEquals(res, 'cache_path/subdirectory') |
| |
| # Check exception handling when tolerance is disabled |
| self.config.set('agent', 'tolerate_download_failures', "false") |
| fetch_url_mock.side_effect = self.caching_exc_side_effect |
| fileCache = FileCache(self.config) |
| try: |
| fileCache.provide_directory("cache_path", "subdirectory", |
| "server_url_prefix") |
| self.fail('CachingException not thrown') |
| except CachingException: |
| pass # Expected |
| except Exception, e: |
| self.fail('Unexpected exception thrown:' + str(e)) |
| |
| # Check that unexpected exceptions are still propagated when |
| # tolerance is enabled |
| self.config.set('agent', 'tolerate_download_failures', "false") |
| fetch_url_mock.side_effect = self.exc_side_effect |
| fileCache = FileCache(self.config) |
| try: |
| fileCache.provide_directory("cache_path", "subdirectory", |
| "server_url_prefix") |
| self.fail('Exception not thrown') |
| except Exception: |
| pass # Expected |
| |
| |
| # Check exception handling when tolerance is enabled |
| self.config.set('agent', 'tolerate_download_failures', "true") |
| fetch_url_mock.side_effect = self.caching_exc_side_effect |
| fileCache = FileCache(self.config) |
| res = fileCache.provide_directory("cache_path", "subdirectory", |
| "server_url_prefix") |
| self.assertEquals(res, 'cache_path/subdirectory') |
| |
| |
| def test_build_download_url(self): |
| fileCache = FileCache(self.config) |
| url = fileCache.build_download_url('http://localhost:8080/resources/', |
| 'stacks/HDP/2.1.1/hooks', 'archive.zip') |
| self.assertEqual(url, |
| 'http://localhost:8080/resources//stacks/HDP/2.1.1/hooks/archive.zip') |
| |
| |
| @patch("urllib2.urlopen") |
| def test_fetch_url(self, urlopen_mock): |
| fileCache = FileCache(self.config) |
| remote_url = "http://dummy-url/" |
| # Test normal download |
| test_str = 'abc' * 100000 # Very long string |
| test_string_io = StringIO.StringIO(test_str) |
| test_buffer = MagicMock() |
| test_buffer.read.side_effect = test_string_io.read |
| urlopen_mock.return_value = test_buffer |
| |
| memory_buffer = fileCache.fetch_url(remote_url) |
| |
| self.assertEquals(memory_buffer.getvalue(), test_str) |
| self.assertEqual(test_buffer.read.call_count, 20) # depends on buffer size |
| # Test exception handling |
| test_buffer.read.side_effect = self.exc_side_effect |
| try: |
| fileCache.fetch_url(remote_url) |
| self.fail('CachingException not thrown') |
| except CachingException: |
| pass # Expected |
| except Exception, e: |
| self.fail('Unexpected exception thrown:' + str(e)) |
| |
| |
| def test_read_write_hash_sum(self): |
| tmpdir = tempfile.mkdtemp() |
| dummyhash = "DUMMY_HASH" |
| fileCache = FileCache(self.config) |
| fileCache.write_hash_sum(tmpdir, dummyhash) |
| newhash = fileCache.read_hash_sum(tmpdir) |
| self.assertEquals(newhash, dummyhash) |
| shutil.rmtree(tmpdir) |
| # Test read of not existing file |
| newhash = fileCache.read_hash_sum(tmpdir) |
| self.assertEquals(newhash, None) |
| # Test write to not existing file |
| with patch("__builtin__.open") as open_mock: |
| open_mock.side_effect = self.exc_side_effect |
| try: |
| fileCache.write_hash_sum(tmpdir, dummyhash) |
| self.fail('CachingException not thrown') |
| except CachingException: |
| pass # Expected |
| except Exception, e: |
| self.fail('Unexpected exception thrown:' + str(e)) |
| |
| |
| @patch("os.path.isfile") |
| @patch("os.path.isdir") |
| @patch("os.unlink") |
| @patch("shutil.rmtree") |
| @patch("os.makedirs") |
| def test_invalidate_directory(self, makedirs_mock, rmtree_mock, |
| unlink_mock, isdir_mock, isfile_mock): |
| fileCache = FileCache(self.config) |
| # Test execution flow if path points to file |
| isfile_mock.return_value = True |
| isdir_mock.return_value = False |
| |
| fileCache.invalidate_directory("dummy-dir") |
| |
| self.assertTrue(unlink_mock.called) |
| self.assertFalse(rmtree_mock.called) |
| self.assertTrue(makedirs_mock.called) |
| |
| unlink_mock.reset_mock() |
| rmtree_mock.reset_mock() |
| makedirs_mock.reset_mock() |
| |
| # Test execution flow if path points to dir |
| isfile_mock.return_value = False |
| isdir_mock.return_value = True |
| |
| fileCache.invalidate_directory("dummy-dir") |
| |
| self.assertFalse(unlink_mock.called) |
| self.assertTrue(rmtree_mock.called) |
| self.assertTrue(makedirs_mock.called) |
| |
| unlink_mock.reset_mock() |
| rmtree_mock.reset_mock() |
| makedirs_mock.reset_mock() |
| |
| # Test exception handling |
| makedirs_mock.side_effect = self.exc_side_effect |
| try: |
| fileCache.invalidate_directory("dummy-dir") |
| self.fail('CachingException not thrown') |
| except CachingException: |
| pass # Expected |
| except Exception, e: |
| self.fail('Unexpected exception thrown:' + str(e)) |
| |
| |
| def test_unpack_archive(self): |
| tmpdir = tempfile.mkdtemp() |
| dummy_archive = os.path.join("ambari_agent", "dummy_files", |
| "dummy_archive.zip") |
| # Test normal flow |
| with open(dummy_archive, "r") as f: |
| data = f.read(os.path.getsize(dummy_archive)) |
| membuf = StringIO.StringIO(data) |
| |
| fileCache = FileCache(self.config) |
| fileCache.unpack_archive(membuf, tmpdir) |
| # Count summary size of unpacked files: |
| total_size = 0 |
| total_files = 0 |
| total_dirs = 0 |
| for dirpath, dirnames, filenames in os.walk(tmpdir): |
| total_dirs += 1 |
| for f in filenames: |
| fp = os.path.join(dirpath, f) |
| total_size += os.path.getsize(fp) |
| total_files += 1 |
| self.assertEquals(total_size, 51258L) |
| self.assertEquals(total_files, 28) |
| self.assertEquals(total_dirs, 8) |
| shutil.rmtree(tmpdir) |
| |
| # Test exception handling |
| with patch("os.path.isdir") as isdir_mock: |
| isdir_mock.side_effect = self.exc_side_effect |
| try: |
| fileCache.unpack_archive(membuf, tmpdir) |
| self.fail('CachingException not thrown') |
| except CachingException: |
| pass # Expected |
| except Exception, e: |
| self.fail('Unexpected exception thrown:' + str(e)) |
| |
| |
| def tearDown(self): |
| # enable stdout |
| sys.stdout = sys.__stdout__ |
| |
| |
| def exc_side_effect(self, *a): |
| raise Exception("horrible_exc") |
| |
| def caching_exc_side_effect(self, *a): |
| raise CachingException("horrible_caching_exc") |