| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| import sys |
| |
| version_min = [2, 7] |
| version_info = sys.version_info |
| version_error = False |
| if(version_info[0] < version_min[0]): |
| version_error = True |
| elif(version_info[0] == version_min[0]): |
| if(version_info[1] < version_min[1]): |
| version_error = True |
| if(version_error): |
| print('Python minimum requirement is version '+str(version_min[0])+'.'+str(version_min[1])) |
| sys.exit(1) |
| |
| import argparse |
| import os |
| |
| from argparse import RawDescriptionHelpFormatter |
| |
| from ducc_util import * |
| |
| class DuccRsync(DuccUtil): |
| |
| merge = True |
| |
| key_ducc_head = 'ducc.head' |
| key_ducc_head_reliable_list = 'ducc.head.reliable.list' |
| |
| rsync_cmd = 'rsync' |
| rsync_flags = '-avz --delete --ignore-errors' |
| |
| head_dirs_list = [ |
| 'admin', |
| 'apache-uima', |
| 'bin', |
| 'cassandra-server', |
| 'cron', |
| 'docs', |
| 'duccling', |
| 'examples', |
| #'history', |
| 'INSTALL', |
| 'issuesFixed', |
| 'lib', |
| 'LICENSE', |
| #'logs', |
| 'NOTICE', |
| 'README', |
| 'RELEASE_NOTES.html', |
| 'resources', |
| 'resources.private', |
| #'state', |
| 'state/duccling.version', |
| 'webserver', |
| ] |
| |
| agent_dirs_list = [ |
| 'admin', |
| 'apache-uima', |
| 'bin', |
| #'cassandra-server', |
| 'cron', |
| 'docs', |
| 'duccling', |
| 'examples', |
| #'history', |
| 'INSTALL', |
| 'issuesFixed', |
| 'lib', |
| 'LICENSE', |
| #'logs', |
| 'NOTICE', |
| 'README', |
| 'RELEASE_NOTES.html', |
| 'resources', |
| 'resources.private/ducc-broker-credentials.properties', |
| #'state', |
| 'state/duccling.version', |
| #'webserver', |
| ] |
| |
| dual_dirs_list = head_dirs_list |
| |
| head_dirs = ' '.join(head_dirs_list) |
| agent_dirs = ' '.join(agent_dirs_list) |
| dual_dirs = ' '.join(dual_dirs_list) |
| |
| def __init__(self): |
| DuccUtil.__init__(self, self.merge) |
| |
| def _fn(self): |
| fpath = __file__.split('/') |
| flen = len(fpath) |
| return fpath[flen-1] |
| |
| # extra help! |
| def get_epilog(self): |
| epilog = '' |
| epilog = epilog+'Purpose: synchronize DUCC directory(s) from present node to other head & agent nodes.' |
| return epilog |
| |
| # parse command line |
| def get_args(self): |
| self.help_head_nodes = 'Blank separated list of head nodes to synchronize, default:"None"; specify "all" for '+' '.join(self.head_nodes) |
| self.help_agent_nodes = 'Blank separated list of agent nodes to synchronize, default:"None"; specify "all" for '+' '.join(self.agent_nodes) |
| |
| self.help_head_dirs = 'Blank separated list of head directories (and files) to synchronize, default="'+self.head_dirs+'"' |
| self.help_agent_dirs = 'Blank separated list of agent directories (and files) to synchronize, default="'+self.agent_dirs+'"' |
| |
| self.help_debug = 'Display debugging messages.' |
| self.help_quiet = 'Suppress informational messages.' |
| |
| self.parser = argparse.ArgumentParser(formatter_class=RawDescriptionHelpFormatter,epilog=self.get_epilog()) |
| |
| self.parser.add_argument('--head-nodes' , action='store' , help=self.help_head_nodes) |
| self.parser.add_argument('--agent-nodes' , action='store' , help=self.help_agent_nodes) |
| self.parser.add_argument('--head-dirs' , action='store' , help=self.help_head_dirs) |
| self.parser.add_argument('--agent-dirs' , action='store' , help=self.help_agent_dirs) |
| |
| self.parser.add_argument('--debug' , action='store_true', help=self.help_debug) |
| self.parser.add_argument('--quiet' , action='store_true', help=self.help_quiet) |
| |
| self.args = self.parser.parse_args() |
| |
| if(self.args.head_nodes == None): |
| if(self.args.agent_nodes == None): |
| raise Exception('--head-nodes and/or --agent-nodes must be specified') |
| |
| # conditionally add node to list of nodes to be updated |
| def add(self,list,node): |
| if(node == self.localhost): |
| pass |
| elif(node in list): |
| pass |
| else: |
| list.append(node) |
| |
| # create list of head nodes |
| def get_head_node_list(self): |
| node_list = [] |
| # head |
| head = self.ducc_properties.get(self.key_ducc_head) |
| #reliable |
| reliable_string = self.ducc_properties.get(self.key_ducc_head_reliable_list) |
| reliable = reliable_string.split() |
| if(len(reliable) > 0): |
| for node in reliable: |
| self.add(node_list,node) |
| else: |
| self.add(node_list,head) |
| return node_list |
| |
| # create list of head nodes |
| def get_agent_node_list(self): |
| node_list = [] |
| ducc_nodes = 'ducc.nodes' |
| nodefiles = ducc_nodes.split(); |
| map = {} |
| for nodefile in nodefiles: |
| nodes, map = self.read_nodefile(nodefile,map) |
| for key in map: |
| value = map[key] |
| for node in value: |
| self.add(node_list,node) |
| return node_list |
| |
| # use user specified head nodes, else from ducc.properties |
| def resolve_head_nodes(self): |
| if(self.args.head_nodes == None): |
| self.head_nodes = None |
| elif(self.args.head_nodes.lower() != 'all'): |
| self.head_nodes = self.args.head_nodes.split() |
| |
| # use user specified agent nodes, else from ducc.nodes |
| def resolve_agent_nodes(self): |
| if(self.args.agent_nodes == None): |
| self.agent_nodes = None |
| elif(self.args.agent_nodes.lower() != 'all'): |
| self.agent_nodes = self.args.agent_nodes.split() |
| |
| # update nodes that are head+agent |
| def update_dual(self,user): |
| if(self.head_nodes != None): |
| if(self.agent_nodes != None): |
| for node in self.head_nodes: |
| if(node in self.agent_nodes): |
| if(self.args.debug): |
| print 'update head+agent: '+node |
| self.rsync(node,user,self.dual_dirs) |
| |
| # update nodes that are head only |
| def update_heads(self,user): |
| update = True |
| if(self.head_nodes != None): |
| for node in self.head_nodes: |
| if(self.agent_nodes != None): |
| if(node in self.agent_nodes): |
| # dual should have done this one |
| update = False |
| if(update): |
| if(self.args.debug): |
| print 'update head: '+node |
| self.rsync(node,user,self.head_dirs) |
| |
| # update nodes that are agent only |
| def update_agents(self,user): |
| update = True |
| if(self.agent_nodes != None): |
| for node in self.agent_nodes: |
| if(self.head_nodes != None): |
| if(node in self.head_nodes): |
| # dual should have done this one |
| update = False |
| if(update): |
| if(self.args.debug): |
| print 'update agent: '+node |
| self.rsync(node,user,self.agent_dirs) |
| |
| # update the specified node, subdirs |
| def rsync(self,node,user,subdirs): |
| for subdir in subdirs.split(): |
| if(not self.args.quiet): |
| print 'rsync '+user+' '+node+' '+subdir |
| dir = os.path.join(DUCC_HOME,subdir) |
| cmd = self.rsync_cmd+' '+self.rsync_flags+' '+dir+' '+user+'@'+node+':'+DUCC_HOME |
| proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| lines = [] |
| for line in proc.stdout: |
| lines.append(line.strip()) |
| proc.wait() |
| rc = proc.returncode |
| if(rc != 0): |
| self.rsync_display(cmd, lines) |
| elif(self.args.debug): |
| self.rsync_display(cmd, lines) |
| |
| def rsync_display(self, cmd, lines): |
| print cmd |
| for line in lines: |
| print line |
| |
| # do rsync of dir(s) from present node to peer(s) |
| def main(self, argv): |
| try: |
| self.head_nodes = self.get_head_node_list() |
| self.agent_nodes = self.get_agent_node_list() |
| self.get_args() |
| self.resolve_head_nodes() |
| self.resolve_agent_nodes() |
| user = find_ducc_uid() |
| self.update_dual(user) |
| self.update_heads(user) |
| self.update_agents(user) |
| except Exception,e: |
| print e |
| |
| if __name__ == '__main__': |
| instance = DuccRsync() |
| instance.main(sys.argv[1:]) |