UIMA-6037 agent autostart should rsync its local ducc_runtime tree
git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/trunk@1865751 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/admin/ducc_rsync b/src/main/admin/ducc_rsync
index ea214b0..0e25fbb 100755
--- a/src/main/admin/ducc_rsync
+++ b/src/main/admin/ducc_rsync
@@ -47,7 +47,16 @@
key_ducc_head_reliable_list = 'ducc.head.reliable.list'
rsync_cmd = 'rsync'
- rsync_flags = '-avz --delete --ignore-errors'
+ rsync_flags = '-e "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" -avz --links --delete --ignore-errors --timeout 20'
+
+ symlink_cmd = 'rsync'
+ symlink_flags = '-e "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" -vz --links --delete --ignore-errors --timeout 20'
+
+ ssh_cmd = 'ssh'
+ ssh_flags = '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
+
+ mkdir_cmd = 'mkdir'
+ mkdir_flags = '-p'
head_dirs_list = [
'admin',
@@ -93,18 +102,24 @@
'README',
'RELEASE_NOTES.html',
'resources',
- 'resources.private/ducc-broker-credentials.properties',
+ #'resources.private',
#'state',
- 'state/duccling.version',
#'webserver',
]
+ agent_files_list = [
+ 'resources.private/ducc-broker-credentials.properties',
+ 'state/duccling.version',
+ ]
+
dual_dirs_list = head_dirs_list
head_dirs = ' '.join(head_dirs_list)
agent_dirs = ' '.join(agent_dirs_list)
dual_dirs = ' '.join(dual_dirs_list)
+ agent_files = ' '.join(agent_files_list)
+
def __init__(self):
DuccUtil.__init__(self, self.merge)
@@ -200,16 +215,20 @@
# update nodes that are head+agent
def update_dual(self,user):
+ rc = 0
if(self.head_nodes != None):
if(self.agent_nodes != None):
for node in self.head_nodes:
if(node in self.agent_nodes):
if(self.args.debug):
print 'update head+agent: '+node
- self.rsync(node,user,self.dual_dirs)
+ self.mkdir(node,user)
+ rc = self.rsync_dirs(node,user,self.dual_dirs)
+ return rc
# update nodes that are head only
def update_heads(self,user):
+ rc = 0
update = True
if(self.head_nodes != None):
for node in self.head_nodes:
@@ -220,10 +239,13 @@
if(update):
if(self.args.debug):
print 'update head: '+node
- self.rsync(node,user,self.head_dirs)
+ self.mkdir(node,user)
+ rc = self.rsync_dirs(node,user,self.head_dirs)
+ return rc
# update nodes that are agent only
def update_agents(self,user):
+ rc = 0
update = True
if(self.agent_nodes != None):
for node in self.agent_nodes:
@@ -234,10 +256,32 @@
if(update):
if(self.args.debug):
print 'update agent: '+node
- self.rsync(node,user,self.agent_dirs)
-
- # update the specified node, subdirs
- def rsync(self,node,user,subdirs):
+ self.mkdir(node,user)
+ rc = self.rsync_dirs(node,user,self.agent_dirs)
+ if(rc == 0):
+ rc = self.rsync_files(node,user,self.agent_files)
+ return rc
+
+ # update the specified node dirs
+ def mkdir(self,node,user):
+ rc = 0
+ rmt = self.mkdir_cmd+' '+self.mkdir_flags+' '+DUCC_HOME
+ cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
+ proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ lines = []
+ for line in proc.stdout:
+ lines.append(line.strip())
+ proc.wait()
+ rc = proc.returncode
+ if(rc != 0):
+ self.rsync_display(rc, cmd, lines)
+ elif(self.args.debug):
+ self.rsync_display(rc, cmd, lines)
+ return rc
+
+ # update the specified node dirs
+ def rsync_dirs(self,node,user,subdirs):
+ rc = 0
for subdir in subdirs.split():
if(not self.args.quiet):
print 'rsync '+user+' '+node+' '+subdir
@@ -250,15 +294,75 @@
proc.wait()
rc = proc.returncode
if(rc != 0):
- self.rsync_display(cmd, lines)
+ self.rsync_display(rc, cmd, lines)
+ break
elif(self.args.debug):
- self.rsync_display(cmd, lines)
+ self.rsync_display(rc, cmd, lines)
+ self.symlink(user,node)
+ return rc
- def rsync_display(self, cmd, lines):
- print cmd
+ # update the specified node files
+ def rsync_files(self,node,user,subfiles):
+ rc = 0
+ for subfile in subfiles.split():
+ if(not self.args.quiet):
+ print 'rsync '+user+' '+node+' '+subfile
+ file = os.path.join(DUCC_HOME,subfile)
+ # head = abs path of this script (.../admin)
+ head, tail = os.path.split(file)
+ rmt = self.mkdir_cmd+' '+self.mkdir_flags+' '+head
+ cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
+ proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ lines = []
+ for line in proc.stdout:
+ lines.append(line.strip())
+ proc.wait()
+ rc = proc.returncode
+ if(rc != 0):
+ self.rsync_display(rc, cmd, lines)
+ break
+ elif(self.args.debug):
+ self.rsync_display(rc, cmd, lines)
+ # rsync file
+ cmd = self.rsync_cmd+' '+self.rsync_flags+' '+file+' '+user+'@'+node+':'+head
+ proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ lines = []
+ for line in proc.stdout:
+ lines.append(line.strip())
+ proc.wait()
+ rc = proc.returncode
+ if(rc != 0):
+ self.rsync_display(rc, cmd, lines)
+ break
+ elif(self.args.debug):
+ self.rsync_display(rc, cmd, lines)
+ self.symlink(user,node)
+ return rc
+
+ def rsync_display(self, rc, cmd, lines):
+ print rc, cmd
for line in lines:
print line
-
+
+ def symlink(self,user,node):
+ # head = abs path of this script (.../admin)
+ head, tail = os.path.split(sys.argv[0])
+ # ducc_home = abs path of DUCC_HOME
+ ducc_home, tail = os.path.split(head)
+ if(len(ducc_home) > 0):
+ user_home = os.path.expanduser('~')
+ ducc_runtime = os.path.join(user_home,'ducc_runtime')
+ rmt = 'ln -s '+ducc_home+' '+ducc_runtime
+ cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
+ proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ lines = []
+ for line in proc.stdout:
+ lines.append(line.strip())
+ proc.wait()
+ rc = proc.returncode
+ #if(rc != 0):
+ #self.rsync_display(rc, cmd, lines)
+
# do rsync of dir(s) from present node to peer(s)
def main(self, argv):
try:
@@ -268,9 +372,13 @@
self.resolve_head_nodes()
self.resolve_agent_nodes()
user = find_ducc_uid()
- self.update_dual(user)
- self.update_heads(user)
- self.update_agents(user)
+ rc = 0
+ if(rc == 0):
+ rc = self.update_dual(user)
+ if(rc == 0):
+ rc = self.update_heads(user)
+ if(rc == 0):
+ rc = self.update_agents(user)
except Exception,e:
print e
diff --git a/src/main/admin/ducc_util.py b/src/main/admin/ducc_util.py
index 3ffc538..0126f82 100644
--- a/src/main/admin/ducc_util.py
+++ b/src/main/admin/ducc_util.py
@@ -268,7 +268,17 @@
self.db_password_guest = dbprops.get('db_password_guest')
if ( self.db_password_guest == None ):
self.db_password_guest = 'guest'
-
+
+ def db_password(self):
+ if(self.db_password == None):
+ self.db_configure()
+ return self.db_password
+
+ def db_password_guest(self):
+ if(self.db_password == None):
+ self.db_configure()
+ return self.db_password_guest
+
# does the database process exist?
def db_process_alive(self):
if ( not os.path.exists(self.db_pidfile) ):
@@ -556,15 +566,15 @@
cmd = '/bin/hostname'
if(node == 'localhost'):
req = self.get_hostname()
- ssh_cmd = 'ssh -o BatchMode=yes -o ConnectTimeout=10'+' '+node+" "+cmd
+ ssh_cmd = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -o ConnectTimeout=10'+' '+node+" "+cmd
resp = self.popen(ssh_cmd)
lines = resp.readlines()
- if(len(lines)== 1):
- line = lines[0]
- line = line.strip();
- rsp = line.split('.')[0]
- if(req == rsp):
- return True;
+ for line in lines:
+ if(node in line):
+ return True
+ print 'not found: ', node
+ for line in lines:
+ print line
if(verbosity):
print 'ssh not operational - unexpected results from:', ssh_cmd
for line in lines:
@@ -582,11 +592,11 @@
if ( do_wait ):
if 'false' == self.ssh_enabled:
return self.popen(cmd)
- return self.popen('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
+ return self.popen('ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
else:
if 'false' == self.ssh_enabled:
return self.spawn(cmd)
- return self.spawn('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
+ return self.spawn('ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
def set_classpath(self):
@@ -1533,6 +1543,7 @@
DuccBase.__init__(self, merge)
self.db_disabled = '--disabled--'
+ self.db_password = None
self.duccling = None
self.broker_url = 'tcp://localhost:61616'
self.broker_protocol = 'tcp'
@@ -1572,8 +1583,6 @@
self.set_classpath()
self.os_pagesize = self.get_os_pagesize()
self.update_properties()
-
- self.db_configure()
manage_broker = self.ducc_properties.get('ducc.broker.automanage')
self.automanage_broker = False
diff --git a/src/main/admin/rm_qoccupancy b/src/main/admin/rm_qoccupancy
index e13fab4..9bda620 100755
--- a/src/main/admin/rm_qoccupancy
+++ b/src/main/admin/rm_qoccupancy
@@ -122,7 +122,7 @@
DH = self.DUCC_HOME
dbn = self.get_db_host()
- guest_pw = self.db_password_guest
+ guest_pw = self.db_password_guest()
os.environ['TERM'] = 'dumb' # insure no colors. --no-color isn't inhibiting colors in this shell for some reason.
CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', guest_pw, '-e', '"select * from ducc.rmnodes; select * from ducc.rmshares;"']
diff --git a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandlerRsync.java b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandlerRsync.java
new file mode 100644
index 0000000..195b35e
--- /dev/null
+++ b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandlerRsync.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ws.server;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.uima.ducc.common.IDuccEnv;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.eclipse.jetty.server.Request;
+
+public class DuccHandlerRsync extends DuccAbstractHandler {
+
+ private static DuccLogger duccLogger = DuccLogger.getLogger(DuccHandlerRsync.class);
+ private static Messages messages = Messages.getInstance();
+ private static DuccId jobid = null;
+
+ private String duccinator_update = duccContext+"/duccinator-update";
+
+ public DuccHandlerRsync() {
+ }
+
+ /*
+ * join lhs + rhs with single separator
+ */
+ private String joiner(String lhs,String rhs) {
+ String retVal = lhs;
+ if(lhs != null) {
+ if(rhs != null) {
+ retVal = lhs+File.separator+rhs;
+ retVal = retVal.replaceAll("/+","/");
+ }
+ }
+ return retVal;
+ }
+
+ /*
+ * produce rsync command line string
+ */
+ private String getCmdRsync() {
+ String ducc_home = IDuccEnv.DUCC_HOME_DIR;
+ String cmd = "admin"+File.separator+"ducc_rsync";
+ String retVal = joiner(ducc_home,cmd);
+ return retVal;
+ }
+
+ /*
+ * run /<ducc_home>/admin/ducc_rsync --agent-nodes <node>
+ */
+ private String runCmdRsync(String node) {
+ String location = "runCmdRsync";
+ String retVal = null;
+ try {
+ List<String> pb_command = new ArrayList<String>();
+ pb_command.add(getCmdRsync());
+ pb_command.add("--agent-nodes");
+ pb_command.add(node);
+ duccLogger.info(location, jobid, pb_command);
+ ProcessBuilder pb = new ProcessBuilder( pb_command );
+ Process p = pb.start();
+ InputStream pOut = p.getInputStream();
+ InputStreamReader isr = new InputStreamReader(pOut);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ StringBuffer sb = new StringBuffer();
+ while ((line = br.readLine()) != null) {
+ sb.append(line);
+ duccLogger.info(location, jobid, line);
+ }
+ retVal = sb.toString();
+ int rc = p.waitFor();
+ duccLogger.info(location, jobid, rc);
+ }
+ catch(Exception e) {
+ duccLogger.error(location, jobid, e);
+ }
+ return retVal;
+ }
+
+ /*
+ * produce start command line string
+ */
+ private String getCmdStart() {
+ String ducc_home = IDuccEnv.DUCC_HOME_DIR;
+ String cmd = "admin"+File.separator+"start_ducc";
+ String retVal = joiner(ducc_home,cmd);
+ return retVal;
+ }
+
+ /*
+ * run /<ducc_home>/admin/start_ducc -c agent@<node>
+ */
+ private String runCmdStart(String node) {
+ String location = "runCmdStart";
+ String retVal = null;
+ try {
+ List<String> pb_command = new ArrayList<String>();
+ pb_command.add(getCmdStart());
+ pb_command.add("-c");
+ pb_command.add("agent@"+node);
+ duccLogger.info(location, jobid, pb_command);
+ ProcessBuilder pb = new ProcessBuilder( pb_command );
+ Process p = pb.start();
+ InputStream pOut = p.getInputStream();
+ InputStreamReader isr = new InputStreamReader(pOut);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ StringBuffer sb = new StringBuffer();
+ while ((line = br.readLine()) != null) {
+ sb.append(line);
+ duccLogger.info(location, jobid, line);
+ }
+ retVal = sb.toString();
+ int rc = p.waitFor();
+ duccLogger.info(location, jobid, rc);
+ }
+ catch(Exception e) {
+ duccLogger.error(location, jobid, e);
+ }
+ return retVal;
+ }
+
+ private void handleDuccServletDuccinatorUpdate(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)
+ throws IOException, ServletException
+ {
+ String methodName = "handleDuccServletDuccinatorUpdate";
+ duccLogger.trace(methodName, null, messages.fetch("enter"));
+ StringBuffer sb = new StringBuffer();
+ String node = request.getRemoteHost();
+ String result;;
+ result = runCmdRsync(node);
+ sb.append(result);
+ result = runCmdStart(node);
+ sb.append(result);
+ response.getWriter().println(sb);
+ duccLogger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ private void handleDuccRequest(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)
+ throws Exception
+ {
+ String methodName = "handleDuccRequest";
+ duccLogger.trace(methodName, null, messages.fetch("enter"));
+ duccLogger.debug(methodName, null,request.toString());
+ duccLogger.debug(methodName, null,"getRequestURI():"+request.getRequestURI());
+ String reqURI = request.getRequestURI()+"";
+ if(reqURI.startsWith(duccContext)) {
+ response.setContentType("text/html;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+ if(reqURI.startsWith(duccinator_update)) {
+ handleDuccServletDuccinatorUpdate(target, baseRequest, request, response);
+ //DuccWebUtil.noCache(response);
+ }
+ }
+ duccLogger.trace(methodName, null, messages.fetch("exit"));
+ }
+
+ public void handle(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)
+ throws IOException, ServletException {
+ String methodName = "handle";
+ try{
+ handleDuccRequest(target, baseRequest, request, response);
+ }
+ catch(Throwable t) {
+ if(isIgnorable(t)) {
+ duccLogger.debug(methodName, jobid, t);
+ }
+ else {
+ duccLogger.info(methodName, jobid, "", t.getMessage(), t);
+ duccLogger.error(methodName, jobid, t);
+ }
+ }
+ }
+
+}
diff --git a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java
index 27ecbe0..7f6ae0a 100644
--- a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java
+++ b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java
@@ -392,6 +392,7 @@
DuccHandlerProxy duccHandlerProxy = new DuccHandlerProxy();
DuccHandlerViz duccHandlerViz = new DuccHandlerViz();
DuccHandlerUserAuthentication duccHandlerUserAuthentication = new DuccHandlerUserAuthentication();
+ DuccHandlerRsync duccHandlerRsync = new DuccHandlerRsync();
SessionHandler sessionHandler = new SessionHandler();
handlers.addHandler(sessionHandler);
handlers.addHandler(duccHandlerUserAuthentication);
@@ -406,6 +407,7 @@
for(Handler handler: localHandlers) {
handlers.addHandler(handler);
}
+ handlers.addHandler(duccHandlerRsync);
handlers.addHandler(duccHandlerJson);
handlers.addHandler(duccHandlerProxy);
handlers.addHandler(duccHandlerClassic);