blob: 52817137d6076b5acff482741ce9c7d5126bd8ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.database;
import java.util.ArrayList;
import java.util.List;
import org.apache.uima.ducc.common.db.DbHelper;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
public class DbOrchestratorProperties implements IDbOrchestratorProperties {
/*
* table comprising Orchestrator properties as name/value pairs
*/
private static String ORCHESTRATOR_PROPERTIES_TABLE = DbOrchestratorProperties.TABLE_NAME.pname();
private static String COL_NAME = DbOrchestratorProperties.name.columnName();
private static String COL_VALUE = DbOrchestratorProperties.value.columnName();
private DuccLogger logger = null;
private DbManager dbManager = null;
private DuccId jobid = null;
public DbOrchestratorProperties(DuccLogger duccLogger) throws Exception {
init(duccLogger);
}
/*
* connect to DB
*/
private boolean init(String[] dburls) throws Exception {
String methodName = "init";
boolean ret = false;
while (true) {
try {
dbManager = new DbManager(dburls, logger);
dbManager.init();
ret = true;
break;
} catch (NoHostAvailableException e) {
logger.error(methodName, null,
"Cannot contact database. Retrying in 5 seconds.");
Thread.sleep(5000);
} catch (Exception e) {
logger.error(methodName, null,
"Errors contacting database. No connetion made.");
logger.error(methodName, null, e);
ret = false;
break;
}
}
return ret;
}
/*
* CQL to create:
* Orchestrator properties table
*/
protected static List<SimpleStatement> mkSchema() throws Exception {
List<SimpleStatement> ret = new ArrayList<SimpleStatement>();
StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS "
+ ORCHESTRATOR_PROPERTIES_TABLE + " (");
buf.append(DbUtil.mkSchema(DbOrchestratorProperties.values()));
buf.append(")");
ret.add(new SimpleStatement(buf.toString()));
List<String> indexes = DbUtil.mkIndices(DbOrchestratorProperties.values(),
ORCHESTRATOR_PROPERTIES_TABLE);
for (String s : indexes) {
ret.add(new SimpleStatement(s));
}
return ret;
}
/**
* Create tables(s)
*/
public void dbInit() throws Exception {
String location = "dbInit";
try {
List<SimpleStatement>specificationsSchema = mkSchema();
DbHandle h = dbManager.open();
for ( SimpleStatement s : specificationsSchema ) {
logger.info(location, jobid, "EXECUTE STATEMENT:"+s.toString());
h.execute(s);
}
initPublicationSeqNo();
initDuccWorkSeqNo();
}
catch(Exception e) {
logger.error(location, jobid, e);
throw e;
}
}
/**
* Prepare for DB access
*/
@Override
public void init(DuccLogger duccLogger) throws Exception {
this.logger = duccLogger;
String[] dbUrls = DbHelper.getHostList();
init(dbUrls);
}
/**
* Add or update an Orchestrator property
*/
@Override
public void upsert(String name, String value) throws Exception {
String location = "upsert";
try {
String table = ORCHESTRATOR_PROPERTIES_TABLE;
String c0 = COL_VALUE+"="+"'"+value+"'";
String c1 = COL_NAME+"="+"'"+name+"'";
String cql = "UPDATE "+table+" SET "+c0+" WHERE "+c1;
logger.debug(location, jobid, cql);
DbHandle h = dbManager.open();
h.execute(cql);
}
catch(Exception e) {
DuccId duccid = null;
String text = "name="+name+" "+"value="+value;
logger.error(location, duccid, text, e);
throw e;
}
}
/**
* Add or update an Orchestrator property
*/
private void insert(String name, String value) throws Exception {
String location = "insert";
try {
String table = ORCHESTRATOR_PROPERTIES_TABLE;
String c0 = COL_VALUE+"="+"'"+value+"'";
String c1 = COL_NAME+"="+"'"+name+"'";
String cql = "UPDATE "+table+" SET "+c0+" WHERE "+c1;
logger.debug(location, jobid, cql);
DbHandle h = dbManager.open();
h.execute(cql);
}
catch(Exception e) {
DuccId duccid = null;
String text = "name="+name+" "+"value="+value;
logger.error(location, duccid, text, e);
throw e;
}
}
/**
* Retrieve an Orchestrator property
*/
@Override
public String fetch(String name) throws Exception {
String location = "fetch";
String value = null;
try {
String table = ORCHESTRATOR_PROPERTIES_TABLE;
String c1 = COL_NAME+"="+"'"+name+"'";
String cql = "SELECT * FROM "+table+" WHERE "+c1;
logger.debug(location, jobid, cql);
DbHandle h = dbManager.open();
ResultSet rs = h.execute(cql);
for ( Row r : rs ) {
String colValue = r.getString(COL_VALUE);
value = colValue;
}
}
catch(Exception e) {
DuccId duccid = null;
String text = "name="+name;
logger.error(location, duccid, text, e);
throw e;
}
return value;
}
public enum keys {
publication_seqno,
duccwork_seqno,
}
private void initPublicationSeqNo() {
String location = "initPublicationSeqNo";
try {
String value = fetch(keys.publication_seqno.name());
Long.parseLong(value);
}
catch(Exception e) {
try {
long value = 0;
insert(keys.publication_seqno.name(),""+value);
}
catch(Exception x) {
logger.error(location, jobid, x);
}
}
}
@Override
public void setPublicationSeqNoIfLarger(long value) {
long curr = value;
long prev = getPublicationSeqNo();
if(curr > prev) {
setPublicationSeqNo(curr);
}
}
@Override
public void setPublicationSeqNo(long value) {
String location = "setPublicationSeqNo";
try {
upsert(keys.publication_seqno.name(),""+value);
}
catch(Exception e) {
logger.error(location, jobid, e);
}
}
@Override
public long getPublicationSeqNo() {
String location = "getPublicationSeqNo";
long retVal = 0;
try {
String value = fetch(keys.publication_seqno.name());
retVal = Long.parseLong(value);
}
catch(Exception e) {
logger.error(location, jobid, e);
}
return retVal;
}
//
@Override
public void setDuccWorkSeqNoIfLarger(long value) {
String location = "setDuccWorkSeqNoIfLarger";
long curr = value;
long prev = getDuccWorkSeqNo();
if(curr > prev) {
setDuccWorkSeqNo(curr);
logger.debug(location, jobid, curr);
}
}
private void initDuccWorkSeqNo() {
String location = "initDuccWorkSeqNo";
long lval = 0;
try {
String value = fetch(keys.duccwork_seqno.name());
lval = Long.parseLong(value);
}
catch(Exception e) {
try {
long value = 0;
insert(keys.duccwork_seqno.name(),""+value);
}
catch(Exception x) {
logger.error(location, jobid, x);
}
}
logger.debug(location, jobid, lval);
}
@Override
public void setDuccWorkSeqNo(long value) {
String location = "setDuccWorkSeqNo";
try {
upsert(keys.duccwork_seqno.name(),""+value);
}
catch(Exception e) {
logger.error(location, jobid, e);
}
logger.debug(location, jobid, value);
}
@Override
public long getDuccWorkSeqNo() {
String location = "getDuccWorkSeqNo";
long retVal = 0;
try {
String value = fetch(keys.duccwork_seqno.name());
retVal = Long.parseLong(value);
}
catch(Exception e) {
logger.error(location, jobid, e);
}
logger.debug(location, jobid, retVal);
return retVal;
}
//////////
@Override
public long getNextPublicationSeqNo() {
synchronized(DbOrchestratorProperties.class) {
long retVal = getPublicationSeqNo()+1;
setPublicationSeqNo(retVal);
return retVal;
}
}
@Override
public long getNextDuccWorkSeqNo() {
String location = "getNextDuccWorkSeqNo";
synchronized(DbOrchestratorProperties.class) {
long retVal = getDuccWorkSeqNo()+1;
setDuccWorkSeqNo(retVal);
logger.debug(location, jobid, retVal);
return retVal;
}
}
}