blob: 0e25fbb941c75215fa9f6297a4a288685b7fd015 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import sys
version_min = [2, 7]
version_info = sys.version_info
version_error = False
if(version_info[0] < version_min[0]):
version_error = True
elif(version_info[0] == version_min[0]):
if(version_info[1] < version_min[1]):
version_error = True
if(version_error):
print('Python minimum requirement is version '+str(version_min[0])+'.'+str(version_min[1]))
sys.exit(1)
import argparse
import os
from argparse import RawDescriptionHelpFormatter
from ducc_util import *
class DuccRsync(DuccUtil):
merge = True
key_ducc_head = 'ducc.head'
key_ducc_head_reliable_list = 'ducc.head.reliable.list'
rsync_cmd = 'rsync'
rsync_flags = '-e "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" -avz --links --delete --ignore-errors --timeout 20'
symlink_cmd = 'rsync'
symlink_flags = '-e "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" -vz --links --delete --ignore-errors --timeout 20'
ssh_cmd = 'ssh'
ssh_flags = '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
mkdir_cmd = 'mkdir'
mkdir_flags = '-p'
head_dirs_list = [
'admin',
'apache-uima',
'bin',
'cassandra-server',
'cron',
'docs',
'duccling',
'examples',
#'history',
'INSTALL',
'issuesFixed',
'lib',
'LICENSE',
#'logs',
'NOTICE',
'README',
'RELEASE_NOTES.html',
'resources',
'resources.private',
#'state',
'state/duccling.version',
'webserver',
]
agent_dirs_list = [
'admin',
'apache-uima',
'bin',
#'cassandra-server',
'cron',
'docs',
'duccling',
'examples',
#'history',
'INSTALL',
'issuesFixed',
'lib',
'LICENSE',
#'logs',
'NOTICE',
'README',
'RELEASE_NOTES.html',
'resources',
#'resources.private',
#'state',
#'webserver',
]
agent_files_list = [
'resources.private/ducc-broker-credentials.properties',
'state/duccling.version',
]
dual_dirs_list = head_dirs_list
head_dirs = ' '.join(head_dirs_list)
agent_dirs = ' '.join(agent_dirs_list)
dual_dirs = ' '.join(dual_dirs_list)
agent_files = ' '.join(agent_files_list)
def __init__(self):
DuccUtil.__init__(self, self.merge)
def _fn(self):
fpath = __file__.split('/')
flen = len(fpath)
return fpath[flen-1]
# extra help!
def get_epilog(self):
epilog = ''
epilog = epilog+'Purpose: synchronize DUCC directory(s) from present node to other head & agent nodes.'
return epilog
# parse command line
def get_args(self):
self.help_head_nodes = 'Blank separated list of head nodes to synchronize, default:"None"; specify "all" for '+' '.join(self.head_nodes)
self.help_agent_nodes = 'Blank separated list of agent nodes to synchronize, default:"None"; specify "all" for '+' '.join(self.agent_nodes)
self.help_head_dirs = 'Blank separated list of head directories (and files) to synchronize, default="'+self.head_dirs+'"'
self.help_agent_dirs = 'Blank separated list of agent directories (and files) to synchronize, default="'+self.agent_dirs+'"'
self.help_debug = 'Display debugging messages.'
self.help_quiet = 'Suppress informational messages.'
self.parser = argparse.ArgumentParser(formatter_class=RawDescriptionHelpFormatter,epilog=self.get_epilog())
self.parser.add_argument('--head-nodes' , action='store' , help=self.help_head_nodes)
self.parser.add_argument('--agent-nodes' , action='store' , help=self.help_agent_nodes)
self.parser.add_argument('--head-dirs' , action='store' , help=self.help_head_dirs)
self.parser.add_argument('--agent-dirs' , action='store' , help=self.help_agent_dirs)
self.parser.add_argument('--debug' , action='store_true', help=self.help_debug)
self.parser.add_argument('--quiet' , action='store_true', help=self.help_quiet)
self.args = self.parser.parse_args()
if(self.args.head_nodes == None):
if(self.args.agent_nodes == None):
raise Exception('--head-nodes and/or --agent-nodes must be specified')
# conditionally add node to list of nodes to be updated
def add(self,list,node):
if(node == self.localhost):
pass
elif(node in list):
pass
else:
list.append(node)
# create list of head nodes
def get_head_node_list(self):
node_list = []
# head
head = self.ducc_properties.get(self.key_ducc_head)
#reliable
reliable_string = self.ducc_properties.get(self.key_ducc_head_reliable_list)
reliable = reliable_string.split()
if(len(reliable) > 0):
for node in reliable:
self.add(node_list,node)
else:
self.add(node_list,head)
return node_list
# create list of head nodes
def get_agent_node_list(self):
node_list = []
ducc_nodes = 'ducc.nodes'
nodefiles = ducc_nodes.split();
map = {}
for nodefile in nodefiles:
nodes, map = self.read_nodefile(nodefile,map)
for key in map:
value = map[key]
for node in value:
self.add(node_list,node)
return node_list
# use user specified head nodes, else from ducc.properties
def resolve_head_nodes(self):
if(self.args.head_nodes == None):
self.head_nodes = None
elif(self.args.head_nodes.lower() != 'all'):
self.head_nodes = self.args.head_nodes.split()
# use user specified agent nodes, else from ducc.nodes
def resolve_agent_nodes(self):
if(self.args.agent_nodes == None):
self.agent_nodes = None
elif(self.args.agent_nodes.lower() != 'all'):
self.agent_nodes = self.args.agent_nodes.split()
# update nodes that are head+agent
def update_dual(self,user):
rc = 0
if(self.head_nodes != None):
if(self.agent_nodes != None):
for node in self.head_nodes:
if(node in self.agent_nodes):
if(self.args.debug):
print 'update head+agent: '+node
self.mkdir(node,user)
rc = self.rsync_dirs(node,user,self.dual_dirs)
return rc
# update nodes that are head only
def update_heads(self,user):
rc = 0
update = True
if(self.head_nodes != None):
for node in self.head_nodes:
if(self.agent_nodes != None):
if(node in self.agent_nodes):
# dual should have done this one
update = False
if(update):
if(self.args.debug):
print 'update head: '+node
self.mkdir(node,user)
rc = self.rsync_dirs(node,user,self.head_dirs)
return rc
# update nodes that are agent only
def update_agents(self,user):
rc = 0
update = True
if(self.agent_nodes != None):
for node in self.agent_nodes:
if(self.head_nodes != None):
if(node in self.head_nodes):
# dual should have done this one
update = False
if(update):
if(self.args.debug):
print 'update agent: '+node
self.mkdir(node,user)
rc = self.rsync_dirs(node,user,self.agent_dirs)
if(rc == 0):
rc = self.rsync_files(node,user,self.agent_files)
return rc
# update the specified node dirs
def mkdir(self,node,user):
rc = 0
rmt = self.mkdir_cmd+' '+self.mkdir_flags+' '+DUCC_HOME
cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = []
for line in proc.stdout:
lines.append(line.strip())
proc.wait()
rc = proc.returncode
if(rc != 0):
self.rsync_display(rc, cmd, lines)
elif(self.args.debug):
self.rsync_display(rc, cmd, lines)
return rc
# update the specified node dirs
def rsync_dirs(self,node,user,subdirs):
rc = 0
for subdir in subdirs.split():
if(not self.args.quiet):
print 'rsync '+user+' '+node+' '+subdir
dir = os.path.join(DUCC_HOME,subdir)
cmd = self.rsync_cmd+' '+self.rsync_flags+' '+dir+' '+user+'@'+node+':'+DUCC_HOME
proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = []
for line in proc.stdout:
lines.append(line.strip())
proc.wait()
rc = proc.returncode
if(rc != 0):
self.rsync_display(rc, cmd, lines)
break
elif(self.args.debug):
self.rsync_display(rc, cmd, lines)
self.symlink(user,node)
return rc
# update the specified node files
def rsync_files(self,node,user,subfiles):
rc = 0
for subfile in subfiles.split():
if(not self.args.quiet):
print 'rsync '+user+' '+node+' '+subfile
file = os.path.join(DUCC_HOME,subfile)
# head = abs path of this script (.../admin)
head, tail = os.path.split(file)
rmt = self.mkdir_cmd+' '+self.mkdir_flags+' '+head
cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = []
for line in proc.stdout:
lines.append(line.strip())
proc.wait()
rc = proc.returncode
if(rc != 0):
self.rsync_display(rc, cmd, lines)
break
elif(self.args.debug):
self.rsync_display(rc, cmd, lines)
# rsync file
cmd = self.rsync_cmd+' '+self.rsync_flags+' '+file+' '+user+'@'+node+':'+head
proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = []
for line in proc.stdout:
lines.append(line.strip())
proc.wait()
rc = proc.returncode
if(rc != 0):
self.rsync_display(rc, cmd, lines)
break
elif(self.args.debug):
self.rsync_display(rc, cmd, lines)
self.symlink(user,node)
return rc
def rsync_display(self, rc, cmd, lines):
print rc, cmd
for line in lines:
print line
def symlink(self,user,node):
# head = abs path of this script (.../admin)
head, tail = os.path.split(sys.argv[0])
# ducc_home = abs path of DUCC_HOME
ducc_home, tail = os.path.split(head)
if(len(ducc_home) > 0):
user_home = os.path.expanduser('~')
ducc_runtime = os.path.join(user_home,'ducc_runtime')
rmt = 'ln -s '+ducc_home+' '+ducc_runtime
cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = []
for line in proc.stdout:
lines.append(line.strip())
proc.wait()
rc = proc.returncode
#if(rc != 0):
#self.rsync_display(rc, cmd, lines)
# do rsync of dir(s) from present node to peer(s)
def main(self, argv):
try:
self.head_nodes = self.get_head_node_list()
self.agent_nodes = self.get_agent_node_list()
self.get_args()
self.resolve_head_nodes()
self.resolve_agent_nodes()
user = find_ducc_uid()
rc = 0
if(rc == 0):
rc = self.update_dual(user)
if(rc == 0):
rc = self.update_heads(user)
if(rc == 0):
rc = self.update_agents(user)
except Exception,e:
print e
if __name__ == '__main__':
instance = DuccRsync()
instance.main(sys.argv[1:])