UIMA-5742 Reliable DUCC
- DUCC database interface support for contact points (plural), but temporarily only 1 for ducc.database.host
git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/branches/reliable-ducc@1829860 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java
index 1280f37..60d1855 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbDuccWorks.java
@@ -70,12 +70,12 @@
/*
* connect to DB
*/
- private boolean init(String dburl) throws Exception {
+ private boolean init(String[] dburls) throws Exception {
String methodName = "init";
boolean ret = false;
while (true) {
try {
- dbManager = new DbManager(dburl, logger);
+ dbManager = new DbManager(dburls, logger);
dbManager.init();
ret = true;
break;
@@ -149,8 +149,9 @@
logger.debug(location, jobid, messageDbDisabled);
return;
}
- String dbUrl = System.getProperty(DbManager.URL_PROPERTY);
- init(dbUrl);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ init(dbUrls);
}
/**
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
index f5a82ca..ac54bb0 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
@@ -566,9 +566,11 @@
long now = System.currentTimeMillis();
boolean run_test = false;
+ String[] dbUrls = DbUtil.dbServersStringToArray(state_url);
+
if ( run_test ) {
try {
- dbManager = new DbManager(state_url, logger);
+ dbManager = new DbManager(dbUrls, logger);
dbManager.init();
test();
@@ -580,7 +582,7 @@
}
- dbManager = new DbManager(state_url, logger);
+ dbManager = new DbManager(dbUrls, logger);
dbManager.init();
if ( true ) {
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
index e5f8f13..5884c64 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
@@ -47,17 +47,17 @@
private static String db_id = null;
private static String db_pw = null;
- String dburl;
+ String[] dburls;
DuccLogger logger;
private Cluster cluster; // only one
private Session session; // only one - it's thread safe and manages a connection pool
- public DbManager(String dburl, DuccLogger logger)
+ public DbManager(String[] dburls, DuccLogger logger)
throws Exception
{
- this.dburl = dburl;
+ this.dburls = dburls;
this.logger = logger;
}
@@ -102,7 +102,7 @@
ReconnectionPolicy rp = new ConstantReconnectionPolicy(10000); // if we lose connection, keep trying every 10 seconds
cluster = Cluster.builder()
.withAuthProvider(auth)
- .addContactPoint(dburl)
+ .addContactPoints(dburls)
.withReconnectionPolicy(rp)
.build();
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java
index 9b6ecec..f3c3694 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbOrchestratorProperties.java
@@ -51,12 +51,12 @@
/*
* connect to DB
*/
- private boolean init(String dburl) throws Exception {
+ private boolean init(String[] dburls) throws Exception {
String methodName = "init";
boolean ret = false;
while (true) {
try {
- dbManager = new DbManager(dburl, logger);
+ dbManager = new DbManager(dburls, logger);
dbManager.init();
ret = true;
break;
@@ -123,8 +123,9 @@
@Override
public void init(DuccLogger duccLogger) throws Exception {
this.logger = duccLogger;
- String dbUrl = System.getProperty(DbManager.URL_PROPERTY);
- init(dbUrl);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ init(dbUrls);
}
/**
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
index 31a4515..f59af49 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
@@ -319,5 +319,27 @@
}
throw new IllegalArgumentException("Unrecognized type for schema: " + t);
}
+
+ static private String[] stringToArray(String input) {
+ String[] output = null;
+ if(input == null) {
+ output = new String[0];
+ }
+ else {
+ output = input.split("\\s+");
+ }
+ return output;
+ }
+
+ // temporary limit??
+ static private int max_db_servers = 1;
+
+ static String[] dbServersStringToArray(String input) throws Exception {
+ String[] output = stringToArray(input);
+ if(output.length > max_db_servers) {
+ throw new Exception();
+ }
+ return output;
+ }
}
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
deleted file mode 100644
index c693dcc..0000000
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.uima.ducc.database;
-
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory;
-import org.apache.uima.ducc.common.persistence.services.StateServicesSet;
-import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.common.utils.id.DuccId;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.SimpleStatement;
-
-/**
- * Toy orientdb loader to load a historydb from ducc history
- */
-
-public class DbVerify
-{
- DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBVERIFY");
- String DUCC_HOME;
-
- DbManager dbManager = null;
- long total_bytes = 0;
-
- boolean verify_res = false;
- boolean verify_svc = false;
- boolean verify_job = false;
- public DbVerify()
- throws Exception
- {
- //String methodName = "<ctr>";
- DUCC_HOME = System.getProperty("DUCC_HOME");
- if ( DUCC_HOME == null ) {
- System.out.println("System proprety -DDUCC_HOME must be set.");
- System.exit(1);
- }
- }
-
-
- void verify(String table)
- throws Exception
- {
- String methodName = "verify";
- DbHandle h = dbManager.open();
- SimpleStatement s = new SimpleStatement("SELECT * from " + table);
- //SimpleStatement s = new SimpleStatement("SELECT * from " + table + " LIMIT 10"); // for test and debug
- logger.info(methodName, null, "Fetch size", s.getFetchSize());
- s.setFetchSize(100);
- long now = System.currentTimeMillis();
-
- int counter = 0;
- int nbytes = 0;
- try {
- ResultSet rs = h.execute(s);
- for ( Row r : rs ) {
- counter++;
- ByteBuffer b = r.getBytes("work");
- byte[] bytes = b.array();
- nbytes += bytes.length;
- total_bytes += bytes.length;;
-
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- ObjectInputStream ois = new ObjectInputStream(bais);
- Object o = ois.readObject();
- ois.close();
- DuccId did = new DuccId(r.getLong("ducc_dbid"));
-
- logger.info(methodName, did, "found object class", o.getClass().getName(), "of type", r.getString("type"), "in table", table, "of size", bytes.length);
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- logger.info(methodName, null, "Found", counter, "results. Total bytes", nbytes);
- logger.info(methodName, null, "Total time for", table, System.currentTimeMillis() - now);
- }
-
- void verifyServices()
- throws Exception
- {
- String methodName = "verify";
- int live = 0;
- int archived = 0;
- StateServicesDb sdb = new StateServicesDb();
- sdb.init(logger,dbManager);
-
- StateServicesDirectory ssd = sdb.fetchServices(true); // first the archived stuff
- Map<Long, StateServicesSet> svcmap = ssd.getMap();
- for ( Long id : svcmap.keySet() ) {
- DuccId did = new DuccId(id);
- archived++;
- logger.info(methodName, did, "Found an archived service.");
- }
-
- ssd = sdb.fetchServices(false); // first the archived stuff
- svcmap = ssd.getMap();
- for ( Long id : svcmap.keySet() ) {
- DuccId did = new DuccId(id);
- logger.info(methodName, did, "Found a live service.");
- live++;
- }
- logger.info(methodName, null, "Found", live, "live services and", archived, "archived services.");
-
- }
-
- void run()
- throws Exception
- {
- String methodName = "run";
- long now = System.currentTimeMillis();
- String state_url = "bluej538";
- try {
- dbManager = new DbManager(state_url, logger);
- dbManager.init();
-
- verifyServices();
-
- if ( verify_res ) verify("ducc.res_history");
- if ( verify_svc ) verify("ducc.svc_history");
- if ( verify_job ) verify("ducc.job_history");
-
- } finally {
- dbManager.shutdown();
- }
- logger.info(methodName, null, "Read", total_bytes, "bytes in", System.currentTimeMillis() - now, "MS");
- }
-
-
- public static void main(String[] args)
- {
- DbVerify v = null;
- try {
- v = new DbVerify();
- v.run();
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
-}
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
index dcb7d38..209cb2d 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
@@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.common.Pair;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
@@ -103,7 +102,7 @@
}
- private boolean init(String dburl, DbManager dbm)
+ private boolean init(String[] dburls, DbManager dbm)
throws Exception
{
String methodName = "init";
@@ -114,7 +113,7 @@
if ( dbm != null ) {
this.dbManager = dbm;
} else {
- dbManager = new DbManager(dburl, logger);
+ dbManager = new DbManager(dburls, logger);
dbManager.init();
}
@@ -147,8 +146,9 @@
throws Exception
{
this.logger = logger;
- String historyUrl = System.getProperty(DbManager.URL_PROPERTY);
- return init(historyUrl, null);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ return init(dbUrls, null);
}
// package only, for the loader
@@ -156,8 +156,9 @@
throws Exception
{
this.logger = logger;
- String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
- return init(stateUrl, dbManager);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ return init(dbUrls, dbManager);
}
/**
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
index 7071016..6d7233e 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
@@ -70,14 +70,14 @@
{
}
- private boolean init(String dburl)
+ private boolean init(String[] dburls)
throws Exception
{
String methodName = "init";
boolean ret = false;
while ( true ) {
try {
- dbManager = new DbManager(dburl, logger);
+ dbManager = new DbManager(dburls, logger);
dbManager.init();
ret = true;
break;
@@ -98,8 +98,9 @@
throws Exception
{
this.logger = logger;
- String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
- init(stateUrl);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ init(dbUrls);
DbHandle h = dbManager.open();
// For creating a new share
diff --git a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
index 2ad9331..3e4e07b 100644
--- a/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
+++ b/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
@@ -63,7 +63,7 @@
}
}
- private boolean init(String dburl, DbManager dbm)
+ private boolean init(String[] dburls, DbManager dbm)
throws Exception
{
String methodName = "init";
@@ -76,7 +76,7 @@
} else {
while ( true ) {
try {
- dbManager = new DbManager(dburl, logger);
+ dbManager = new DbManager(dburls, logger);
dbManager.init();
break;
} catch ( NoHostAvailableException e ) {
@@ -97,8 +97,9 @@
throws Exception
{
this.logger = logger;
- String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
- return init(stateUrl, null);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ return init(dbUrls, null);
}
// package only, for the loader
@@ -106,8 +107,9 @@
throws Exception
{
this.logger = logger;
- String stateUrl = System.getProperty(DbManager.URL_PROPERTY);
- return init(stateUrl, dbManager);
+ String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
+ String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
+ return init(dbUrls, dbManager);
}
private Map<Long, DuccProperties> getProperties(String tableid, IDbProperty[] props, boolean active)