blob: 0b66fa70eb21499354fb859dcfbd35e7e258290d [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import AmbariConfig
import threading
import os
import time
import re
import logging
logger = logging.getLogger(__name__)
class DataCleaner(threading.Thread):
COMMAND_FILE_NAMES_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
AUTO_COMMAND_FILE_NAMES_PATTERN = \
'auto_command-\d+.json|auto_errors-\d+.txt|auto_output-\d+.txt|auto_structured-out-\d+.json'
FILE_NAME_PATTERN = AUTO_COMMAND_FILE_NAMES_PATTERN + "|" + COMMAND_FILE_NAMES_PATTERN
def __init__(self, config):
threading.Thread.__init__(self)
self.daemon = True
logger.info('Data cleanup thread started')
self.config = config
self.file_max_age = config.get('agent', 'data_cleanup_max_age', 86400)
self.file_max_age = int(self.file_max_age) if self.file_max_age else None
if self.file_max_age is None or self.file_max_age < 86400: # keep for at least 24h
logger.warn('The minimum value allowed for data_cleanup_max_age is 1 '
'day. Setting data_cleanup_max_age to 86400.')
self.file_max_age = 86400
self.cleanup_interval = config.get('agent', 'data_cleanup_interval', 3600)
self.cleanup_interval = int(self.cleanup_interval) if self.cleanup_interval else None
if self.cleanup_interval is None or self.cleanup_interval < 3600: # wait at least 1 hour between runs
logger.warn('The minimum value allowed for data_cleanup_interval is 1 '
'hour. Setting data_cleanup_interval to 3600.')
self.cleanup_interval = 3600
self.cleanup_max_size_MB = config.get('agent', 'data_cleanup_max_size_MB', 10000)
self.cleanup_max_size_MB = int(self.cleanup_max_size_MB) if self.cleanup_max_size_MB else None
if self.cleanup_max_size_MB is None or self.cleanup_max_size_MB > 10000: # no more than 10 GBs
logger.warn('The maximum value allowed for cleanup_max_size_MB is 10000 MB (10 GB). '
'Setting cleanup_max_size_MB to 10000.')
self.cleanup_max_size_MB = 10000
self.data_dir = config.get('agent','prefix')
self.compiled_pattern = re.compile(self.FILE_NAME_PATTERN)
self.stopped = False
def __del__(self):
logger.info('Data cleanup thread killed.')
def cleanup(self):
logger.debug("Cleaning up inside directory " + self.data_dir)
now = time.time()
total_size_bytes = 0
file_path_to_timestamp = {}
file_path_to_size = {}
for root, dirs, files in os.walk(self.data_dir):
for f in files:
file_path = os.path.join(root, f)
if self.compiled_pattern.match(f):
try:
file_age = now - os.path.getmtime(file_path)
if file_age > self.file_max_age:
os.remove(os.path.join(file_path))
logger.debug('Removed file: ' + file_path)
else:
# Since file wasn't deleted in first pass, consider it for the second one with oldest files first
file_size = os.path.getsize(file_path)
total_size_bytes += file_size
file_path_to_timestamp[file_path] = file_age
file_path_to_size[file_path] = file_size
except Exception:
logger.error('Error when removing file: ' + file_path)
target_size_bytes = self.cleanup_max_size_MB * 1000000
if len(file_path_to_timestamp) and total_size_bytes > target_size_bytes:
logger.info("DataCleaner values need to be more aggressive. Current size in bytes for all log files is %d, "
"and will try to clean to reach %d bytes." % (total_size_bytes, target_size_bytes))
# Prune oldest files first
count = 0
file_path_oldest_first_list = sorted(file_path_to_timestamp, key=file_path_to_timestamp.get, reverse=True)
for file_path in file_path_oldest_first_list:
try:
os.remove(os.path.join(file_path))
total_size_bytes -= file_path_to_size[file_path]
count += 1
if total_size_bytes <= target_size_bytes:
# Finally reached below the cap
break
except Exception:
pass
else:
# Did not reach below cap.
logger.warn("DataCleaner deleted an additional %d files, currently log files occupy %d bytes." %
(count, total_size_bytes))
pass
def run(self):
while not self.stopped:
logger.info('Data cleanup started')
self.cleanup()
logger.info('Data cleanup finished')
time.sleep(self.cleanup_interval)
def main():
data_cleaner = DataCleaner(AmbariConfig.config)
data_cleaner.start()
data_cleaner.join()
if __name__ == "__main__":
main()