blob: ea214b04e1cad7b817ab188d7087396bbf29c5b5 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import sys
version_min = [2, 7]
version_info = sys.version_info
version_error = False
if(version_info[0] < version_min[0]):
version_error = True
elif(version_info[0] == version_min[0]):
if(version_info[1] < version_min[1]):
version_error = True
if(version_error):
print('Python minimum requirement is version '+str(version_min[0])+'.'+str(version_min[1]))
sys.exit(1)
import argparse
import os
from argparse import RawDescriptionHelpFormatter
from ducc_util import *
class DuccRsync(DuccUtil):
merge = True
key_ducc_head = 'ducc.head'
key_ducc_head_reliable_list = 'ducc.head.reliable.list'
rsync_cmd = 'rsync'
rsync_flags = '-avz --delete --ignore-errors'
head_dirs_list = [
'admin',
'apache-uima',
'bin',
'cassandra-server',
'cron',
'docs',
'duccling',
'examples',
#'history',
'INSTALL',
'issuesFixed',
'lib',
'LICENSE',
#'logs',
'NOTICE',
'README',
'RELEASE_NOTES.html',
'resources',
'resources.private',
#'state',
'state/duccling.version',
'webserver',
]
agent_dirs_list = [
'admin',
'apache-uima',
'bin',
#'cassandra-server',
'cron',
'docs',
'duccling',
'examples',
#'history',
'INSTALL',
'issuesFixed',
'lib',
'LICENSE',
#'logs',
'NOTICE',
'README',
'RELEASE_NOTES.html',
'resources',
'resources.private/ducc-broker-credentials.properties',
#'state',
'state/duccling.version',
#'webserver',
]
dual_dirs_list = head_dirs_list
head_dirs = ' '.join(head_dirs_list)
agent_dirs = ' '.join(agent_dirs_list)
dual_dirs = ' '.join(dual_dirs_list)
def __init__(self):
DuccUtil.__init__(self, self.merge)
def _fn(self):
fpath = __file__.split('/')
flen = len(fpath)
return fpath[flen-1]
# extra help!
def get_epilog(self):
epilog = ''
epilog = epilog+'Purpose: synchronize DUCC directory(s) from present node to other head & agent nodes.'
return epilog
# parse command line
def get_args(self):
self.help_head_nodes = 'Blank separated list of head nodes to synchronize, default:"None"; specify "all" for '+' '.join(self.head_nodes)
self.help_agent_nodes = 'Blank separated list of agent nodes to synchronize, default:"None"; specify "all" for '+' '.join(self.agent_nodes)
self.help_head_dirs = 'Blank separated list of head directories (and files) to synchronize, default="'+self.head_dirs+'"'
self.help_agent_dirs = 'Blank separated list of agent directories (and files) to synchronize, default="'+self.agent_dirs+'"'
self.help_debug = 'Display debugging messages.'
self.help_quiet = 'Suppress informational messages.'
self.parser = argparse.ArgumentParser(formatter_class=RawDescriptionHelpFormatter,epilog=self.get_epilog())
self.parser.add_argument('--head-nodes' , action='store' , help=self.help_head_nodes)
self.parser.add_argument('--agent-nodes' , action='store' , help=self.help_agent_nodes)
self.parser.add_argument('--head-dirs' , action='store' , help=self.help_head_dirs)
self.parser.add_argument('--agent-dirs' , action='store' , help=self.help_agent_dirs)
self.parser.add_argument('--debug' , action='store_true', help=self.help_debug)
self.parser.add_argument('--quiet' , action='store_true', help=self.help_quiet)
self.args = self.parser.parse_args()
if(self.args.head_nodes == None):
if(self.args.agent_nodes == None):
raise Exception('--head-nodes and/or --agent-nodes must be specified')
# conditionally add node to list of nodes to be updated
def add(self,list,node):
if(node == self.localhost):
pass
elif(node in list):
pass
else:
list.append(node)
# create list of head nodes
def get_head_node_list(self):
node_list = []
# head
head = self.ducc_properties.get(self.key_ducc_head)
#reliable
reliable_string = self.ducc_properties.get(self.key_ducc_head_reliable_list)
reliable = reliable_string.split()
if(len(reliable) > 0):
for node in reliable:
self.add(node_list,node)
else:
self.add(node_list,head)
return node_list
# create list of head nodes
def get_agent_node_list(self):
node_list = []
ducc_nodes = 'ducc.nodes'
nodefiles = ducc_nodes.split();
map = {}
for nodefile in nodefiles:
nodes, map = self.read_nodefile(nodefile,map)
for key in map:
value = map[key]
for node in value:
self.add(node_list,node)
return node_list
# use user specified head nodes, else from ducc.properties
def resolve_head_nodes(self):
if(self.args.head_nodes == None):
self.head_nodes = None
elif(self.args.head_nodes.lower() != 'all'):
self.head_nodes = self.args.head_nodes.split()
# use user specified agent nodes, else from ducc.nodes
def resolve_agent_nodes(self):
if(self.args.agent_nodes == None):
self.agent_nodes = None
elif(self.args.agent_nodes.lower() != 'all'):
self.agent_nodes = self.args.agent_nodes.split()
# update nodes that are head+agent
def update_dual(self,user):
if(self.head_nodes != None):
if(self.agent_nodes != None):
for node in self.head_nodes:
if(node in self.agent_nodes):
if(self.args.debug):
print 'update head+agent: '+node
self.rsync(node,user,self.dual_dirs)
# update nodes that are head only
def update_heads(self,user):
update = True
if(self.head_nodes != None):
for node in self.head_nodes:
if(self.agent_nodes != None):
if(node in self.agent_nodes):
# dual should have done this one
update = False
if(update):
if(self.args.debug):
print 'update head: '+node
self.rsync(node,user,self.head_dirs)
# update nodes that are agent only
def update_agents(self,user):
update = True
if(self.agent_nodes != None):
for node in self.agent_nodes:
if(self.head_nodes != None):
if(node in self.head_nodes):
# dual should have done this one
update = False
if(update):
if(self.args.debug):
print 'update agent: '+node
self.rsync(node,user,self.agent_dirs)
# update the specified node, subdirs
def rsync(self,node,user,subdirs):
for subdir in subdirs.split():
if(not self.args.quiet):
print 'rsync '+user+' '+node+' '+subdir
dir = os.path.join(DUCC_HOME,subdir)
cmd = self.rsync_cmd+' '+self.rsync_flags+' '+dir+' '+user+'@'+node+':'+DUCC_HOME
proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = []
for line in proc.stdout:
lines.append(line.strip())
proc.wait()
rc = proc.returncode
if(rc != 0):
self.rsync_display(cmd, lines)
elif(self.args.debug):
self.rsync_display(cmd, lines)
def rsync_display(self, cmd, lines):
print cmd
for line in lines:
print line
# do rsync of dir(s) from present node to peer(s)
def main(self, argv):
try:
self.head_nodes = self.get_head_node_list()
self.agent_nodes = self.get_agent_node_list()
self.get_args()
self.resolve_head_nodes()
self.resolve_agent_nodes()
user = find_ducc_uid()
self.update_dual(user)
self.update_heads(user)
self.update_agents(user)
except Exception,e:
print e
if __name__ == '__main__':
instance = DuccRsync()
instance.main(sys.argv[1:])