UIMA-6037 agent autostart should rsync its local ducc_runtime tree

git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/trunk@1865751 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/admin/ducc_rsync b/src/main/admin/ducc_rsync
index ea214b0..0e25fbb 100755
--- a/src/main/admin/ducc_rsync
+++ b/src/main/admin/ducc_rsync
@@ -47,7 +47,16 @@
     key_ducc_head_reliable_list = 'ducc.head.reliable.list'
     
     rsync_cmd = 'rsync'
-    rsync_flags = '-avz --delete --ignore-errors'
+    rsync_flags = '-e "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" -avz --links --delete --ignore-errors --timeout 20'
+    
+    symlink_cmd = 'rsync'
+    symlink_flags = '-e "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" -vz --links --delete --ignore-errors --timeout 20'
+
+    ssh_cmd = 'ssh'
+    ssh_flags = '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
+     
+    mkdir_cmd = 'mkdir'
+    mkdir_flags = '-p'
     
     head_dirs_list = [
         'admin', 
@@ -93,18 +102,24 @@
         'README',
         'RELEASE_NOTES.html',
         'resources', 
-        'resources.private/ducc-broker-credentials.properties',   
+        #'resources.private',   
         #'state',
-        'state/duccling.version',
         #'webserver', 
         ]   
     
+    agent_files_list = [
+        'resources.private/ducc-broker-credentials.properties',  
+        'state/duccling.version', 
+        ]
+    
     dual_dirs_list = head_dirs_list 
     
     head_dirs  = ' '.join(head_dirs_list)
     agent_dirs = ' '.join(agent_dirs_list)
     dual_dirs  = ' '.join(dual_dirs_list)
     
+    agent_files = ' '.join(agent_files_list)
+    
     def __init__(self):
         DuccUtil.__init__(self, self.merge)
 
@@ -200,16 +215,20 @@
 
     # update nodes that are head+agent
     def update_dual(self,user):
+        rc = 0
         if(self.head_nodes != None):
             if(self.agent_nodes != None):
                 for node in self.head_nodes:
                     if(node in self.agent_nodes):
                         if(self.args.debug):
                             print 'update head+agent: '+node
-                        self.rsync(node,user,self.dual_dirs)
+                        self.mkdir(node,user)
+                        rc = self.rsync_dirs(node,user,self.dual_dirs)
+        return rc
                                 
     # update nodes that are head only
     def update_heads(self,user):
+        rc = 0
         update = True
         if(self.head_nodes != None):
             for node in self.head_nodes:
@@ -220,10 +239,13 @@
                 if(update):
                     if(self.args.debug):
                         print 'update head: '+node
-                    self.rsync(node,user,self.head_dirs)
+                    self.mkdir(node,user)
+                    rc = self.rsync_dirs(node,user,self.head_dirs)
+        return rc
     
     # update nodes that are agent only
     def update_agents(self,user):
+        rc = 0
         update = True
         if(self.agent_nodes != None):
             for node in self.agent_nodes:
@@ -234,10 +256,32 @@
                 if(update):
                     if(self.args.debug):
                         print 'update agent: '+node
-                    self.rsync(node,user,self.agent_dirs)
-                    
-    # update the specified node, subdirs
-    def rsync(self,node,user,subdirs):
+                    self.mkdir(node,user)
+                    rc = self.rsync_dirs(node,user,self.agent_dirs)
+                    if(rc == 0):
+                        rc = self.rsync_files(node,user,self.agent_files)
+        return rc
+    
+    # update the specified node dirs
+    def mkdir(self,node,user):
+        rc = 0
+        rmt = self.mkdir_cmd+' '+self.mkdir_flags+' '+DUCC_HOME
+        cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
+        proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+        lines = []
+        for line in proc.stdout:
+            lines.append(line.strip())
+        proc.wait()
+        rc = proc.returncode
+        if(rc != 0):
+            self.rsync_display(rc, cmd, lines)
+        elif(self.args.debug):
+            self.rsync_display(rc, cmd, lines)
+        return rc
+    
+    # update the specified node dirs
+    def rsync_dirs(self,node,user,subdirs):
+        rc = 0
         for subdir in subdirs.split():
             if(not self.args.quiet):
                 print 'rsync '+user+' '+node+' '+subdir
@@ -250,15 +294,75 @@
             proc.wait()
             rc = proc.returncode
             if(rc != 0):
-                self.rsync_display(cmd, lines)
+                self.rsync_display(rc, cmd, lines)
+                break
             elif(self.args.debug):
-                self.rsync_display(cmd, lines)
+                self.rsync_display(rc, cmd, lines)
+        self.symlink(user,node)
+        return rc
     
-    def rsync_display(self, cmd, lines):
-        print cmd
+    # update the specified node files
+    def rsync_files(self,node,user,subfiles):
+        rc = 0
+        for subfile in subfiles.split():
+            if(not self.args.quiet):
+                print 'rsync '+user+' '+node+' '+subfile
+            file = os.path.join(DUCC_HOME,subfile)
+            # head = abs path of this script (.../admin)
+            head, tail = os.path.split(file)
+            rmt = self.mkdir_cmd+' '+self.mkdir_flags+' '+head
+            cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
+            proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+            lines = []
+            for line in proc.stdout:
+                lines.append(line.strip())
+            proc.wait()
+            rc = proc.returncode
+            if(rc != 0):
+                self.rsync_display(rc, cmd, lines)
+                break
+            elif(self.args.debug):
+                self.rsync_display(rc, cmd, lines)
+            # rsync file
+            cmd = self.rsync_cmd+' '+self.rsync_flags+' '+file+' '+user+'@'+node+':'+head
+            proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+            lines = []
+            for line in proc.stdout:
+                lines.append(line.strip())
+            proc.wait()
+            rc = proc.returncode
+            if(rc != 0):
+                self.rsync_display(rc, cmd, lines)
+                break
+            elif(self.args.debug):
+                self.rsync_display(rc, cmd, lines)
+        self.symlink(user,node)
+        return rc
+    
+    def rsync_display(self, rc, cmd, lines):
+        print rc, cmd
         for line in lines:
             print line
-        
+    
+    def symlink(self,user,node):
+        # head = abs path of this script (.../admin)
+        head, tail = os.path.split(sys.argv[0])
+        # ducc_home = abs path of DUCC_HOME
+        ducc_home, tail = os.path.split(head)
+        if(len(ducc_home) > 0):
+            user_home = os.path.expanduser('~')
+            ducc_runtime = os.path.join(user_home,'ducc_runtime')
+            rmt = 'ln -s '+ducc_home+' '+ducc_runtime
+            cmd = self.ssh_cmd+' '+self.ssh_flags+' '+user+'@'+node+' '+rmt
+            proc = subprocess.Popen(cmd, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+            lines = []
+            for line in proc.stdout:
+                lines.append(line.strip())
+            proc.wait()
+            rc = proc.returncode
+            #if(rc != 0):
+                #self.rsync_display(rc, cmd, lines)
+    
     # do rsync of dir(s) from present node to peer(s)
     def main(self, argv):    
         try:
@@ -268,9 +372,13 @@
             self.resolve_head_nodes()
             self.resolve_agent_nodes()
             user = find_ducc_uid()
-            self.update_dual(user)
-            self.update_heads(user)
-            self.update_agents(user)
+            rc = 0
+            if(rc == 0):
+                rc = self.update_dual(user)
+            if(rc == 0):
+                rc = self.update_heads(user)
+            if(rc == 0):
+                rc = self.update_agents(user)
         except Exception,e:
             print e
         
diff --git a/src/main/admin/ducc_util.py b/src/main/admin/ducc_util.py
index 3ffc538..0126f82 100644
--- a/src/main/admin/ducc_util.py
+++ b/src/main/admin/ducc_util.py
@@ -268,7 +268,17 @@
         self.db_password_guest = dbprops.get('db_password_guest')
         if ( self.db_password_guest == None ):
             self.db_password_guest = 'guest'
-                
+    
+    def db_password(self):
+        if(self.db_password == None):
+            self.db_configure()
+        return self.db_password
+    
+    def db_password_guest(self):
+        if(self.db_password == None):
+            self.db_configure()
+        return self.db_password_guest
+    
     # does the database process exist?  
     def db_process_alive(self):
         if ( not os.path.exists(self.db_pidfile) ):
@@ -556,15 +566,15 @@
         cmd = '/bin/hostname'
         if(node == 'localhost'):
             req = self.get_hostname()
-        ssh_cmd = 'ssh -o BatchMode=yes -o ConnectTimeout=10'+' '+node+" "+cmd
+        ssh_cmd = 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -o ConnectTimeout=10'+' '+node+" "+cmd
         resp = self.popen(ssh_cmd)
         lines = resp.readlines()
-        if(len(lines)== 1):
-            line = lines[0]
-            line = line.strip();
-            rsp = line.split('.')[0]
-            if(req == rsp):
-                return True;
+        for line in lines:
+            if(node in line):
+                return True
+        print 'not found: ', node
+        for line in lines:
+            print line
         if(verbosity):
             print 'ssh not operational - unexpected results from:', ssh_cmd
             for line in lines:
@@ -582,11 +592,11 @@
         if ( do_wait ):
             if 'false' == self.ssh_enabled:
                 return self.popen(cmd)
-            return self.popen('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
+            return self.popen('ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
         else:
             if 'false' == self.ssh_enabled:
                 return self.spawn(cmd)
-            return self.spawn('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
+            return self.spawn('ssh -q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
 
 
     def set_classpath(self):
@@ -1533,6 +1543,7 @@
         DuccBase.__init__(self, merge)
 
         self.db_disabled = '--disabled--'
+        self.db_password = None
         self.duccling = None
         self.broker_url = 'tcp://localhost:61616'
         self.broker_protocol = 'tcp'
@@ -1572,8 +1583,6 @@
         self.set_classpath()
         self.os_pagesize = self.get_os_pagesize()
         self.update_properties()
-
-        self.db_configure()
         
         manage_broker = self.ducc_properties.get('ducc.broker.automanage')
         self.automanage_broker = False
diff --git a/src/main/admin/rm_qoccupancy b/src/main/admin/rm_qoccupancy
index e13fab4..9bda620 100755
--- a/src/main/admin/rm_qoccupancy
+++ b/src/main/admin/rm_qoccupancy
@@ -122,7 +122,7 @@
         DH = self.DUCC_HOME
         dbn = self.get_db_host()
 
-        guest_pw = self.db_password_guest
+        guest_pw = self.db_password_guest()
 
         os.environ['TERM'] = 'dumb'      # insure no colors.  --no-color isn't inhibiting colors in this shell for some reason.
         CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', guest_pw, '-e', '"select * from ducc.rmnodes; select * from ducc.rmshares;"']
diff --git a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandlerRsync.java b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandlerRsync.java
new file mode 100644
index 0000000..195b35e
--- /dev/null
+++ b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandlerRsync.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ws.server;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.uima.ducc.common.IDuccEnv;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.eclipse.jetty.server.Request;
+
+public class DuccHandlerRsync extends DuccAbstractHandler {
+
+	private static DuccLogger duccLogger = DuccLogger.getLogger(DuccHandlerRsync.class);
+	private static Messages messages = Messages.getInstance();
+	private static DuccId jobid = null;
+	
+	private String duccinator_update = duccContext+"/duccinator-update";
+	
+	public DuccHandlerRsync() {
+	}
+
+	/*
+	 * join lhs + rhs with single separator
+	 */
+	private String joiner(String lhs,String rhs) {
+		String retVal = lhs;
+		if(lhs != null) {
+			if(rhs != null) {
+				retVal = lhs+File.separator+rhs;
+				retVal = retVal.replaceAll("/+","/");
+			}
+		}
+		return retVal;
+	}
+	
+	/*
+	 * produce rsync command line string
+	 */
+	private String getCmdRsync() {
+		String ducc_home = IDuccEnv.DUCC_HOME_DIR;
+		String cmd = "admin"+File.separator+"ducc_rsync";
+		String retVal = joiner(ducc_home,cmd);
+		return retVal;
+	}
+	
+	/*
+	 * run /<ducc_home>/admin/ducc_rsync --agent-nodes <node>
+	 */
+	private String runCmdRsync(String node) {
+		String location = "runCmdRsync";
+		String retVal = null;
+		try {
+			List<String> pb_command = new ArrayList<String>();
+			pb_command.add(getCmdRsync());
+			pb_command.add("--agent-nodes");
+			pb_command.add(node);
+			duccLogger.info(location, jobid, pb_command);
+			ProcessBuilder pb = new ProcessBuilder( pb_command );
+			Process p = pb.start();
+			InputStream pOut = p.getInputStream();
+			InputStreamReader isr = new InputStreamReader(pOut);
+			BufferedReader br = new BufferedReader(isr);
+	        String line;
+	        StringBuffer sb = new StringBuffer();
+	        while ((line = br.readLine()) != null) {
+	        	sb.append(line);
+	        	duccLogger.info(location, jobid, line);
+	        }
+	        retVal = sb.toString();
+	        int rc = p.waitFor();
+	        duccLogger.info(location, jobid, rc);
+		}
+		catch(Exception e) {
+			duccLogger.error(location, jobid, e);
+		}
+		return retVal;
+	}
+	
+	/*
+	 * produce start command line string
+	 */
+	private String getCmdStart() {
+		String ducc_home = IDuccEnv.DUCC_HOME_DIR;
+		String cmd = "admin"+File.separator+"start_ducc";
+		String retVal = joiner(ducc_home,cmd);
+		return retVal;
+	}
+
+	/*
+	 * run /<ducc_home>/admin/start_ducc -c agent@<node>
+	 */
+	private String runCmdStart(String node) {
+		String location = "runCmdStart";
+		String retVal = null;
+		try {
+			List<String> pb_command = new ArrayList<String>();
+			pb_command.add(getCmdStart());
+			pb_command.add("-c");
+			pb_command.add("agent@"+node);
+			duccLogger.info(location, jobid, pb_command);
+			ProcessBuilder pb = new ProcessBuilder( pb_command );
+			Process p = pb.start();
+			InputStream pOut = p.getInputStream();
+			InputStreamReader isr = new InputStreamReader(pOut);
+			BufferedReader br = new BufferedReader(isr);
+	        String line;
+	        StringBuffer sb = new StringBuffer();
+	        while ((line = br.readLine()) != null) {
+	        	sb.append(line);
+	        	duccLogger.info(location, jobid, line);
+	        }
+	        retVal = sb.toString();
+	        int rc = p.waitFor();
+	        duccLogger.info(location, jobid, rc);
+		}
+		catch(Exception e) {
+			duccLogger.error(location, jobid, e);
+		}
+		return retVal;
+	}
+	
+	private void handleDuccServletDuccinatorUpdate(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)
+	throws IOException, ServletException
+	{
+		String methodName = "handleDuccServletDuccinatorUpdate";
+		duccLogger.trace(methodName, null, messages.fetch("enter"));
+		StringBuffer sb = new StringBuffer();
+		String node = request.getRemoteHost();
+		String result;;
+		result = runCmdRsync(node);
+		sb.append(result);
+		result = runCmdStart(node);
+		sb.append(result);
+		response.getWriter().println(sb);
+		duccLogger.trace(methodName, null, messages.fetch("exit"));
+	}
+
+	private void handleDuccRequest(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)
+	throws Exception
+	{
+		String methodName = "handleDuccRequest";
+		duccLogger.trace(methodName, null, messages.fetch("enter"));
+		duccLogger.debug(methodName, null,request.toString());
+		duccLogger.debug(methodName, null,"getRequestURI():"+request.getRequestURI());
+		String reqURI = request.getRequestURI()+"";
+		if(reqURI.startsWith(duccContext)) {
+			response.setContentType("text/html;charset=utf-8");
+			response.setStatus(HttpServletResponse.SC_OK);
+			baseRequest.setHandled(true);
+			if(reqURI.startsWith(duccinator_update)) {
+				handleDuccServletDuccinatorUpdate(target, baseRequest, request, response);
+				//DuccWebUtil.noCache(response);
+			}
+		}
+		duccLogger.trace(methodName, null, messages.fetch("exit"));
+	}
+
+	public void handle(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)
+	throws IOException, ServletException {
+		String methodName = "handle";
+		try{
+			handleDuccRequest(target, baseRequest, request, response);
+		}
+		catch(Throwable t) {
+			if(isIgnorable(t)) {
+				duccLogger.debug(methodName, jobid, t);
+			}
+			else {
+				duccLogger.info(methodName, jobid, "", t.getMessage(), t);
+				duccLogger.error(methodName, jobid, t);
+			}
+		}
+	}
+
+}
diff --git a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java
index 27ecbe0..7f6ae0a 100644
--- a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java
+++ b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccWebServer.java
@@ -392,6 +392,7 @@
         DuccHandlerProxy duccHandlerProxy = new DuccHandlerProxy();
         DuccHandlerViz duccHandlerViz = new DuccHandlerViz();
         DuccHandlerUserAuthentication duccHandlerUserAuthentication = new DuccHandlerUserAuthentication();
+        DuccHandlerRsync duccHandlerRsync = new DuccHandlerRsync();
         SessionHandler sessionHandler = new SessionHandler();
         handlers.addHandler(sessionHandler);
         handlers.addHandler(duccHandlerUserAuthentication);
@@ -406,6 +407,7 @@
         for(Handler handler: localHandlers) {
         	handlers.addHandler(handler);
         }
+        handlers.addHandler(duccHandlerRsync);
         handlers.addHandler(duccHandlerJson);
         handlers.addHandler(duccHandlerProxy);
         handlers.addHandler(duccHandlerClassic);