UIMA-5742 Reliable DUCC
- DUCC database interface support for contact points (plural), but temporarily only 1 for ducc.database.host


git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/branches/reliable-ducc@1829860 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java
index 1280f37..60d1855 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java
@@ -70,12 +70,12 @@
 	/*
 	 * connect to DB
 	 */
-	private boolean init(String dburl) throws Exception {
+	private boolean init(String[] dburls) throws Exception {
 		String methodName = "init";
 		boolean ret = false;
 		while (true) {
 			try {
-				dbManager = new DbManager(dburl, logger);
+				dbManager = new DbManager(dburls, logger);
 				dbManager.init();
 				ret = true;
 				break;
@@ -149,8 +149,9 @@
 			 logger.debug(location, jobid, messageDbDisabled);
 			 return;
 		 }
-	     String dbUrl = System.getProperty(DbManager.URL_PROPERTY);
-	     init(dbUrl);
+	     String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+	     String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+	     init(dbUrls);
 	}
 
 	/**
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
index f5a82ca..ac54bb0 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
@@ -566,9 +566,11 @@
         long now = System.currentTimeMillis();
         boolean run_test = false;
 
+        String[] dbUrls = DbUtil.dbServersStringToArray(state_url);
+        
         if ( run_test ) {
             try {
-                dbManager = new DbManager(state_url, logger);
+                dbManager = new DbManager(dbUrls, logger);
                 dbManager.init();
                 
                 test();
@@ -580,7 +582,7 @@
 
         }
 
-        dbManager = new DbManager(state_url, logger);
+        dbManager = new DbManager(dbUrls, logger);
         dbManager.init();
 
         if ( true ) {
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
index e5f8f13..5884c64 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
@@ -47,17 +47,17 @@
     private static String db_id = null;
     private static String db_pw = null;
 
-    String dburl;
+    String[] dburls;
     DuccLogger logger;
 
     private Cluster cluster;            // only one
     private Session session;            // only one - it's thread safe and manages a connection pool
 
     
-    public DbManager(String dburl, DuccLogger logger)
+    public DbManager(String[] dburls, DuccLogger logger)
         throws Exception
     {
-        this.dburl = dburl;
+        this.dburls = dburls;
         this.logger = logger;
     }
     
@@ -102,7 +102,7 @@
         ReconnectionPolicy rp = new ConstantReconnectionPolicy(10000);  // if we lose connection, keep trying every 10 seconds
         cluster = Cluster.builder()
             .withAuthProvider(auth)
-            .addContactPoint(dburl)
+            .addContactPoints(dburls)
             .withReconnectionPolicy(rp)
             .build();
 
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java
index 9b6ecec..f3c3694 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java
@@ -51,12 +51,12 @@
 	/*
 	 * connect to DB
 	 */
-	private boolean init(String dburl) throws Exception {
+	private boolean init(String[] dburls) throws Exception {
 		String methodName = "init";
 		boolean ret = false;
 		while (true) {
 			try {
-				dbManager = new DbManager(dburl, logger);
+				dbManager = new DbManager(dburls, logger);
 				dbManager.init();
 				ret = true;
 				break;
@@ -123,8 +123,9 @@
 	@Override
 	public void init(DuccLogger duccLogger) throws Exception {
 		 this.logger = duccLogger;
-	     String dbUrl = System.getProperty(DbManager.URL_PROPERTY);
-	     init(dbUrl);
+		 String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+	     String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+	     init(dbUrls);
 	}
 
 	/**
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
index 31a4515..f59af49 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
@@ -319,5 +319,27 @@
         }
         throw new IllegalArgumentException("Unrecognized type for schema: " + t);
     }
+    
+    static private String[] stringToArray(String input) {
+    	String[] output = null;
+    	if(input == null) {
+    		output = new String[0];
+    	}
+    	else {
+    		output = input.split("\\s+");
+    	}
+    	return output;
+    }
+    
+    // temporary limit??
+    static private int max_db_servers = 1;
+    
+    static String[] dbServersStringToArray(String input) throws Exception {
+    	String[] output = stringToArray(input);
+    	if(output.length > max_db_servers) {
+    		throw new Exception();
+    	}
+    	return output;
+    }
 
 }
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
deleted file mode 100644
index c693dcc..0000000
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.uima.ducc.database;
-
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory;
-import org.apache.uima.ducc.common.persistence.services.StateServicesSet;
-import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.common.utils.id.DuccId;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.SimpleStatement;
-
-/**
- * Toy orientdb loader to load a historydb from ducc history
- */
-
-public class DbVerify
-{
-    DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBVERIFY");
-    String DUCC_HOME;
-
-    DbManager dbManager = null;
-    long total_bytes = 0;
-
-    boolean verify_res = false;
-    boolean verify_svc = false;
-    boolean verify_job = false;
-    public DbVerify()
-        throws Exception
-    {
-    	//String methodName = "<ctr>";
-        DUCC_HOME = System.getProperty("DUCC_HOME");        
-        if ( DUCC_HOME == null ) {
-            System.out.println("System proprety -DDUCC_HOME must be set.");
-            System.exit(1);
-        }        
-    }
-
-
-    void verify(String table)
-    	throws Exception
-    {
-    	String methodName = "verify";
-        DbHandle h = dbManager.open();
-        SimpleStatement s = new SimpleStatement("SELECT * from " + table);
-        //SimpleStatement s = new SimpleStatement("SELECT * from " + table + " LIMIT 10"); // for test and debug
-        logger.info(methodName, null, "Fetch size", s.getFetchSize());
-        s.setFetchSize(100);
-        long now = System.currentTimeMillis();
-
-        int counter = 0;
-        int nbytes = 0;
-        try {
-            ResultSet rs = h.execute(s);
-            for ( Row r : rs ) {
-                counter++;
-                ByteBuffer b = r.getBytes("work");
-                byte[] bytes = b.array();
-                nbytes += bytes.length;
-                total_bytes += bytes.length;;
-
-                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-                ObjectInputStream ois = new ObjectInputStream(bais);
-                Object o = ois.readObject();
-                ois.close();            
-                DuccId did = new DuccId(r.getLong("ducc_dbid"));
-                
-                logger.info(methodName, did, "found object class", o.getClass().getName(), "of type", r.getString("type"), "in table", table, "of size", bytes.length);
-            }
-        } catch (Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        
-        logger.info(methodName, null, "Found", counter, "results. Total bytes", nbytes);
-        logger.info(methodName, null, "Total time for", table, System.currentTimeMillis() - now);
-    }
-    
-    void verifyServices()
-    	throws Exception
-    {
-    	String methodName = "verify";
-        int live = 0;
-        int archived = 0;
-        StateServicesDb sdb = new StateServicesDb();
-        sdb.init(logger,dbManager);
-
-        StateServicesDirectory ssd = sdb.fetchServices(true);          // first the archived stuff
-        Map<Long, StateServicesSet>  svcmap = ssd.getMap();
-        for ( Long id : svcmap.keySet() ) {
-            DuccId did = new DuccId(id);
-            archived++;
-            logger.info(methodName, did, "Found an archived service.");
-        }
-
-        ssd = sdb.fetchServices(false);          // first the archived stuff
-        svcmap = ssd.getMap();
-        for ( Long id : svcmap.keySet() ) {
-            DuccId did = new DuccId(id);
-            logger.info(methodName, did, "Found a live service.");
-            live++;
-        }
-        logger.info(methodName, null, "Found", live, "live services and", archived, "archived services.");
-
-    }
-
-    void run()
-        throws Exception
-    {
-        String methodName = "run";
-        long now = System.currentTimeMillis();
-        String state_url = "bluej538";
-        try {
-            dbManager = new DbManager(state_url, logger);
-            dbManager.init();
-
-            verifyServices();
-
-            if ( verify_res ) verify("ducc.res_history");
-            if ( verify_svc ) verify("ducc.svc_history");                
-            if ( verify_job ) verify("ducc.job_history");
-            
-        } finally {
-            dbManager.shutdown();
-        }
-        logger.info(methodName, null, "Read", total_bytes, "bytes in",  System.currentTimeMillis() - now, "MS");
-    }
-
-    
-    public static void main(String[] args)
-    {
-        DbVerify v = null;
-        try {
-            v = new DbVerify();
-            v.run();
-        } catch ( Exception e  ) {
-            e.printStackTrace();
-        } 
-    }
-}
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
index dcb7d38..209cb2d 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
@@ -27,7 +27,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.uima.ducc.common.Pair;
 import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
@@ -103,7 +102,7 @@
     }
     
     
-	private boolean init(String dburl, DbManager dbm)
+	private boolean init(String[] dburls, DbManager dbm)
         throws Exception
     {        
 		String methodName = "init";
@@ -114,7 +113,7 @@
                 if ( dbm != null ) {
                     this.dbManager = dbm;
                 } else {
-                    dbManager = new DbManager(dburl, logger);
+                    dbManager = new DbManager(dburls, logger);
                     dbManager.init();
                 }
                 
@@ -147,8 +146,9 @@
         throws Exception
     {
         this.logger = logger;
-        String historyUrl = System.getProperty(DbManager.URL_PROPERTY);
-        return init(historyUrl, null);
+        String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+        String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+        return init(dbUrls, null);
     }
 
     // package only, for the loader
@@ -156,8 +156,9 @@
     	throws Exception
     {
     	this.logger = logger;
-        String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
-        return init(stateUrl, dbManager);
+        String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+        String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+        return init(dbUrls, dbManager);
     }
 
     /**
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
index 7071016..6d7233e 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
@@ -70,14 +70,14 @@
     {
     }
 
-    private boolean init(String dburl)
+    private boolean init(String[] dburls)
         throws Exception
     {
     	String methodName = "init";
         boolean ret = false;
         while ( true ) {
             try {
-                dbManager = new DbManager(dburl, logger);
+                dbManager = new DbManager(dburls, logger);
                 dbManager.init();
                 ret = true;
                 break;
@@ -98,8 +98,9 @@
     	throws Exception
     {
     	this.logger = logger;
-        String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
-        init(stateUrl);
+    	String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+        String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+        init(dbUrls);
         DbHandle h = dbManager.open();
 
         // For creating a new share
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
index 2ad9331..3e4e07b 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
@@ -63,7 +63,7 @@
         }
     }
 
-    private boolean init(String dburl, DbManager dbm)
+    private boolean init(String[] dburls, DbManager dbm)
         throws Exception
     {
     	String methodName = "init";
@@ -76,7 +76,7 @@
         } else {
             while ( true ) {
                 try {
-                    dbManager = new DbManager(dburl, logger);
+                    dbManager = new DbManager(dburls, logger);
                     dbManager.init();
                     break;
                 } catch ( NoHostAvailableException e ) {
@@ -97,8 +97,9 @@
     	throws Exception
     {
     	this.logger = logger;
-        String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
-        return init(stateUrl, null);
+        String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+        String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+        return init(dbUrls, null);
     }
 
     // package only, for the loader
@@ -106,8 +107,9 @@
     	throws Exception
     {
     	this.logger = logger;
-        String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
-        return init(stateUrl, dbManager);
+    	String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+        String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+        return init(dbUrls, dbManager);
     }
 
     private Map<Long, DuccProperties> getProperties(String tableid, IDbProperty[] props, boolean active)