blob: 209cb2d60b6ac451b41dbdcf3c946fe066ea46c7 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.uima.ducc.database;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.uima.ducc.common.Pair;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.transport.event.common.ADuccWorkExecutable;
import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
import org.apache.uima.ducc.transport.event.common.IDuccReservation;
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
import org.apache.uima.ducc.transport.event.common.IDuccWork;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.history.IHistoryPersistenceManager;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
public class HistoryManagerDb
implements IHistoryPersistenceManager
private DuccLogger logger = null;
private DbManager dbManager;
private long ckptBytesMax = -1;
PreparedStatement jobBlobPrepare = null;
PreparedStatement reservationBlobPrepare = null;
PreparedStatement serviceBlobPrepare = null;
PreparedStatement ckptPrepare = null;
PreparedStatement processDetailsPrepare = null; // "process" for things that aren't "reservations"
PreparedStatement reservationAllocPrepare = null; // "process" for things that are "reservaitons"
PreparedStatement jobDetailsPrepare = null;
PreparedStatement reservationDetailsPrepare = null;
static final String JOB_HISTORY_TABLE = OrWorkProps.JOB_HISTORY_TABLE.pname();
static final String RES_HISTORY_TABLE = OrWorkProps.RESERVATION_HISTORY_TABLE.pname();
static final String SVC_HISTORY_TABLE = OrWorkProps.SERVICE_HISTORY_TABLE.pname();
static final String CKPT_TABLE = OrCkptProps.CKPT_TABLE.pname();
static final String PROCESS_TABLE = OrProcessProps.TABLE_NAME.pname();
static final String JOB_TABLE = OrJobProps.TABLE_NAME.pname();
static final String RESERVATION_TABLE = OrReservationProps.TABLE_NAME.pname();
static String[] alltables = {JOB_HISTORY_TABLE,
// Jira 4804 For now don't save details in tables: jobs, reservations, & processes
static final boolean saveDetails = System.getenv("SAVE_DB_DETAILS") == null ? false : true;
public HistoryManagerDb()
private boolean init(String[] dburls, DbManager dbm)
throws Exception
String methodName = "init";
boolean ret = true;, null, "Initializing OR persistence over the database");
while ( true ) {
try {
if ( dbm != null ) {
this.dbManager = dbm;
} else {
dbManager = new DbManager(dburls, logger);
// prepare some statements
DbHandle h =;
jobBlobPrepare = h.prepare("INSERT INTO " + JOB_HISTORY_TABLE + " (ducc_id, type, history, work) VALUES (?, ?, ?, ?) ;");
reservationBlobPrepare = h.prepare("INSERT INTO " + RES_HISTORY_TABLE + " (ducc_id, type, history, work) VALUES (?, ?, ?, ?) ;");
serviceBlobPrepare = h.prepare("INSERT INTO " + SVC_HISTORY_TABLE + " (ducc_id, type, history, work) VALUES (?, ?, ?, ?) ;");
ckptPrepare = h.prepare("INSERT INTO " + CKPT_TABLE + " (id, work, p2jmap) VALUES (?, ?, ?);");
if (saveDetails) { // Jira-4804
processDetailsPrepare = h.prepare("INSERT INTO " + PROCESS_TABLE + " (host, ducc_id, share_id, type, user, memory, start, stop, class, pid, reason_agent, exit_code, reason_scheduler, cpu, swap_max, run_time, init_time, initialized, investment, major_faults, gc_count, gc_time) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;");
reservationAllocPrepare = h.prepare("INSERT INTO " + PROCESS_TABLE + " (host, ducc_id, share_id, type, user, memory, start, stop, class, run_time) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;");
jobDetailsPrepare = h.prepare("INSERT INTO " + JOB_TABLE + " (user, class, ducc_id, submission_time, duration, memory, reason, init_fails, errors, pgin, swap, total_wi, retries, preemptions, description) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;");
reservationDetailsPrepare = h.prepare("INSERT INTO " + RESERVATION_TABLE + " (user, class, ducc_id, submission_time, duration, memory, reason, processes, state, type, hosts, description) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
} // Jira-4804
} catch ( NoHostAvailableException e ) {
logger.error(methodName, null, "Cannot contact database. Retrying in 5 seconds.");
} catch ( Exception e ) {
logger.error(methodName, null, "Errors contacting database. No connetion made.", e);
ret = false;
return ret;
public boolean init(DuccLogger logger)
throws Exception
this.logger = logger;
String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
return init(dbUrls, null);
// package only, for the loader
boolean init(DuccLogger logger, DbManager dbManager)
throws Exception
this.logger = logger;
String dbUrlsString = System.getProperty(DbManager.URL_PROPERTY);
String[] dbUrls = DbUtil.dbServersStringToArray(dbUrlsString);
return init(dbUrls, dbManager);
* For bulk loader, we drop some of the indices during loading.
* Some of the tables are not a problem during bulk loading so we only externalize the
* indexes on some tables.
static ArrayList<SimpleStatement> dropIndices()
ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>();
List<String> indexes = DbUtil.dropIndices(OrProcessProps.values(), PROCESS_TABLE);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
indexes = DbUtil.dropIndices(OrJobProps.values(), JOB_TABLE);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
indexes = DbUtil.dropIndices(OrReservationProps.values(), RESERVATION_TABLE);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
return ret;
* For bulk loader, we must recreate indices
* Some of the tables are not a problem during bulk loading so we only externalize the
* indexes on some tables.
static ArrayList<SimpleStatement> createIndices()
ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>();
List<String> indexes = DbUtil.mkIndices(OrProcessProps.values(), PROCESS_TABLE);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
indexes = DbUtil.mkIndices(OrJobProps.values(), JOB_TABLE);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
indexes = DbUtil.mkIndices(OrReservationProps.values(), RESERVATION_TABLE);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
return ret;
* Schema gen. Do anything you want to make the schema, but notice that DbUtil has a few convenience methods if
* you want to define your schema in a magic enum.
static ArrayList<SimpleStatement> mkSchema(String tablename)
throws Exception
ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>();
StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + tablename + " (");
buf.append("WITH CLUSTERING ORDER BY (ducc_id desc)");
ret.add(new SimpleStatement(buf.toString()));
List<String> indexes = DbUtil.mkIndices(OrWorkProps.values(), tablename);
for ( String s : indexes ) {
ret.add(new SimpleStatement(s));
return ret;
static ArrayList<SimpleStatement> mkSchema()
throws Exception
ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>();
StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + CKPT_TABLE + " (");
ret.add(new SimpleStatement(buf.toString()));
if (saveDetails) { // Jira 4804
buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + PROCESS_TABLE + " (");
ret.add(new SimpleStatement(buf.toString()));
buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + JOB_TABLE + " (");
ret.add(new SimpleStatement(buf.toString()));
buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RESERVATION_TABLE + " (");
ret.add(new SimpleStatement(buf.toString()));
// PLEASE NOTE: The process, job, and reservation tables can have 10000s, 100000s or 1000000s of records during bulk
// load of a system that has been running a while during execution of DbLoader. The DbLoader will drop the indexes
// on these three tables and then recreate them. To support this we break out the creation into another
// routine that can be called from the loader.
} // Jira 4804
return ret;
// ----------------------------------------------------------------------------------------------------
int toInt(String i)
if ( i == null ) return 0;
try {
return Integer.parseInt(i);
} catch ( Exception e ) {
return 0;
String getString(String s)
return s == null ? "<none>" : s;
void summarizeJob(DbHandle h, IDuccWork w, String type)
throws Exception
IDuccWorkJob j = (IDuccWorkJob) w;
// need duccid, user, class, submission-time, duration, memory, exit-reason, init-fails, pgin, swap, total-wi, retries, preemptions, description
long ducc_id = j.getDuccId().getFriendly();
IDuccStandardInfo dsi = j.getStandardInfo();
IDuccSchedulingInfo dsx = j.getSchedulingInfo();
String user = dsi.getUser();
String jclass = getString(dsx.getSchedulingClass());
int memory = toInt(dsx.getMemorySizeRequested());
long submission = dsi.getDateOfSubmissionMillis();
long completion = dsi.getDateOfCompletionMillis();
long duration = Math.max(0, completion - submission);
String reason = getString(j.getCompletionType().toString());
int init_fails = (int) j.getProcessInitFailureCount();
long pgin = j.getPgInCount();
long swap = (long) j.getSwapUsageGbMax();
int wi = (int) j.getWiTotal();
int errors = toInt(dsx.getWorkItemsError());
int retries = toInt(dsx.getWorkItemsRetry());
int preemptions = toInt(dsx.getWorkItemsPreempt());
String description = getString(dsi.getDescription());
h.execute(jobDetailsPrepare, user, jclass, ducc_id, submission, duration, memory, reason, init_fails, errors, pgin, swap, wi, retries, preemptions, description);
void summarizeProcesses(DbHandle h, IDuccWork w, String type)
throws Exception
// Loop through the processes on w saving useful things:
// jobid, processid, node, user, type of job, PID, duration, start timestamp, stop timestamp, exit state,
// memory, CPU, exit code, swap max, investment, init time
long work_id = w.getDuccId().getFriendly();
switch ( w.getDuccType() ) {
case Job:
case Service:
case Pop:
IDuccProcessMap m = ((ADuccWorkExecutable)w).getProcessMap();
Map<DuccId, IDuccProcess> map = m.getMap();
IDuccStandardInfo dsi = w.getStandardInfo();
IDuccSchedulingInfo dsx = w.getSchedulingInfo();
String user = dsi.getUser();
int memory = toInt(dsx.getMemorySizeRequested());
String sclass = dsx.getSchedulingClass();
for ( IDuccProcess idp : map.values() ) {
long share_id = idp.getDuccId().getFriendly();
long pid = toInt(idp.getPID());
String node = getString(idp.getNodeIdentity().getCanonicalName());
String reason_agent = getString(idp.getReasonForStoppingProcess()); // called "reason" in duccprocess but not in ws
String extended_reason_agent = getString(idp.getExtendedReasonForStoppingProcess());
String reason_scheduler = getString(idp.getProcessDeallocationType().toString()); // called "processDeallocationType" in duccprocess but not in ws
int exit_code = idp.getProcessExitCode();
long cpu = idp.getCurrentCPU();
long swap = idp.getSwapUsageMax();
ITimeWindow itw = idp.getTimeWindowInit();
long processStart = 0;
long initTime = 0;
if ( itw != null ) {
processStart = itw.getStartLong();
initTime = itw.getElapsedMillis();
itw = idp.getTimeWindowRun();
long processEnd = 0;
if ( itw != null ) {
processEnd = idp.getTimeWindowRun().getEndLong();
boolean initialized = idp.isInitialized();
long investment = idp.getWiMillisInvestment();
long major_faults = idp.getMajorFaults();
long gccount = 0;
long gctime = 0;
ProcessGarbageCollectionStats gcs = idp.getGarbageCollectionStats();
if ( gcs != null ) {
gccount = gcs.getCollectionCount();
gctime = gcs.getCollectionTime();
h.execute(processDetailsPrepare, node, work_id, share_id, type, user, memory, processStart, processEnd, sclass,
pid, reason_agent, extended_reason_agent, exit_code, reason_scheduler, cpu, swap, Math.max(0, (processEnd-processStart)), initTime,
initialized, investment, major_faults, gccount, gctime);
case Reservation:
IDuccReservationMap m = ((IDuccWorkReservation)w).getReservationMap();
Map<DuccId, IDuccReservation> map = m.getMap();
IDuccStandardInfo dsi = w.getStandardInfo();
IDuccSchedulingInfo dsx = w.getSchedulingInfo();
long start = dsi.getDateOfCompletionMillis();
long stop = dsi.getDateOfSubmissionMillis();
int memory_size = 0;
if ( dsx.getMemorySizeRequested() == null ) {
memory_size = toInt(dsx.getMemorySizeRequested());
for ( IDuccReservation idr : map.values() ) {
String node = "<none>";
if ( idr.getNode() != null ) {
node = idr.getNode().getNodeIdentity().getCanonicalName();
try {
h.execute(reservationAllocPrepare, node, work_id,
idr.getDuccId().getFriendly(), type, getString(dsi.getUser()), memory_size,
start, stop, getString(dsx.getSchedulingClass()), Math.max(0, (stop-start)) );
} catch (Exception e) {
// TODO Auto-generated catch block
default :
void summarizeReservation(DbHandle h, IDuccWork w)
throws Exception
DuccWorkReservation r = (DuccWorkReservation) w; // cannot use the interface because it is incomplete
long ducc_id = r.getDuccId().getFriendly();
IDuccStandardInfo dsi = r.getStandardInfo();
IDuccSchedulingInfo dsx = r.getSchedulingInfo();
String user = dsi.getUser();
String jclass = dsx.getSchedulingClass();
int memory = toInt(dsx.getMemorySizeRequested());
long submission = dsi.getDateOfSubmissionMillis();
long completion = dsi.getDateOfCompletionMillis();
long duration = Math.max(0, completion - submission);
String reason = getString(r.getCompletionType().toString());
String description = getString(dsi.getDescription());
List<String> nodes = r.getNodes();
int processes = nodes.size();
StringBuffer buf = new StringBuffer("");
for ( int i = 0; i < processes; i++ ) {
if ( i < (processes-1) ) buf.append(" ");
String hosts = buf.toString();
String type = "R";
String state = r.getReservationState().toString();
h.execute(reservationDetailsPrepare, user, jclass, ducc_id, submission, duration, memory, reason, processes, state, type, hosts, description);
void saveWork(PreparedStatement s, IDuccWork w, boolean isHistory)
throws Exception
String methodName = "saveWork";
Long nowP = System.currentTimeMillis();
String type = null;
String processType = null;
switch ( w.getDuccType() ) {
case Job:
type = "job";
processType = "J";
case Service:
case Pop:
switch ( ((IDuccWorkService)w).getServiceDeploymentType() )
case uima:
case custom:
type = "service";
processType = "S";
case other:
type = "AP";
processType = "A";
default :
case Reservation:
type = "reservation";
processType = "R";
// ?? why are unmanaged reservations saved in processes? because they have a share/rm-id ?
// illegal - internal error if this happens
logger.error(methodName, w.getDuccId(), "Unknown job type", w.getDuccType(), "Cannot save to database.");
}, w.getDuccId(), "saving " + type);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
byte[] bytes = baos.toByteArray();
ByteBuffer buf = ByteBuffer.wrap(bytes);
DbHandle h =;
h.saveObject(s, w.getDuccId().getFriendly(), type, isHistory, buf);
if (saveDetails) { // Jira-4804
switch ( w.getDuccType() ) {
case Job:
summarizeJob(h, w, "J");
case Service:
case Pop:
case Reservation:
break; // Can't get here, we'd abort above in this case
summarizeProcesses(h, w, processType); // summarize each process of the work
} // Jira-4804
logger.trace(methodName, w.getDuccId(), "----------> Time to save", type, ":", System.currentTimeMillis() - nowP, "Size:", bytes.length, "bytes.");
* Part of history management, recover the indicated job from history.
<T> T restoreWork(Class<T> cl, String tablename, long friendly_id)
throws Exception
String methodName = "restoreWork";
T ret = null;
DbHandle h = null;
h =;
String cql = "SELECT WORK FROM " + tablename + " WHERE DUCC_ID=" + Long.toString(friendly_id);
ResultSet rs = h.execute(cql);
for ( Row r : rs ) {, null, "----- Restoring", friendly_id);
ByteBuffer bbWork = r.getBytes("work");
byte[] workbytes = bbWork.array();
ByteArrayInputStream bais = new ByteArrayInputStream(workbytes);
ObjectInputStream ois = new ObjectInputStream(bais);
ret= (T) ois.readObject();
return ret;
* Part of history management, recover ths indicated jobs from history.
* Reminder to self, we need to pass Clas<T> cl so compiler can infer T.
public <T> ArrayList<T> restoreSeveralThings(Class<T> cl, String tablename, String where_clause, long max)
throws Exception
String methodName = "restoreSeveralThings";
ArrayList<T> ret = new ArrayList<T>();
DbHandle h =;
StringBuffer sb = new StringBuffer();
sb.append("SELECT * from ");
if(where_clause != null) {
sb.append(" "+where_clause);
sb.append(" limit "+max);
sb.append(" ALLOW FILTERING");
String query = sb.toString();, null, query);
SimpleStatement s = new SimpleStatement(query);
long now = System.currentTimeMillis();
try {
int count = 0;
int nbytes = 0;
ResultSet rs = h.execute(s);
for ( Row r : rs ) {
ByteBuffer b = r.getBytes("work");
byte[] workbytes = b.array();
nbytes += workbytes.length;
ByteArrayInputStream bais = new ByteArrayInputStream(workbytes);
ObjectInputStream ois = new ObjectInputStream(bais);
ret.add( (T) ois.readObject());
}, null, "Found", count, "results. Total bytes", nbytes, "Time:", System.currentTimeMillis() - now);
} catch (Exception e) {
logger.error(methodName, null, "Error fetching history:", e);
return ret;
* For use by normal operation: forces an existence check. This saves history only.
public void saveJob(IDuccWorkJob j)
throws Exception
saveWork(jobBlobPrepare, j, true);
* Part of history management, recover ths indicated job from history.
public IDuccWorkJob restoreJob(long friendly_id)
throws Exception
return (IDuccWorkJob) restoreWork(IDuccWorkJob.class, JOB_HISTORY_TABLE, friendly_id);
* Part of history management, recover ths indicated jobs from history.
public ArrayList<IDuccWorkJob> restoreJobs(long max)
throws Exception
return restoreSeveralThings(IDuccWorkJob.class, JOB_HISTORY_TABLE, null, max);
// End of jobs section
// ----------------------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------------------
// Reservations section
// Save to history only
public void saveReservation(IDuccWorkReservation r)
throws Exception
saveWork(reservationBlobPrepare, r, true);
* Part of history management, recover ths indicated reservation from history.
public IDuccWorkReservation restoreReservation(long duccid)
throws Exception
return (IDuccWorkReservation) restoreWork(IDuccWorkReservation.class, RES_HISTORY_TABLE, duccid);
* Part of history management, recover ths indicated reservations from history.
public ArrayList<IDuccWorkReservation> restoreReservations(long max)
throws Exception
return restoreSeveralThings(IDuccWorkReservation.class, RES_HISTORY_TABLE, null, max);
// End of reservations section
// ----------------------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------------------
// Services section
public void saveService(IDuccWorkService s)
throws Exception
saveWork(serviceBlobPrepare, s, true);
* Part of history management, recover ths indicated service instance from history.
public IDuccWorkService restoreService(long duccid)
throws Exception
return (IDuccWorkService) restoreWork(IDuccWorkService.class, SVC_HISTORY_TABLE, duccid);
* Part of history management, recover ths indicated service instances from history.
public ArrayList<IDuccWorkService> restoreServices(long max)
throws Exception
return restoreSeveralThings(IDuccWorkService.class, SVC_HISTORY_TABLE, "WHERE TYPE = 'service'", max);
* Part of history management, recover these indicated AP instances from history.
public ArrayList<IDuccWorkService> restoreArbitraryProcesses(long max)
throws Exception
return restoreSeveralThings(IDuccWorkService.class, SVC_HISTORY_TABLE, "WHERE TYPE = 'AP'", max);
// End of services section
// ----------------------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------------------
// Orchecstrator Checkpoint save and restore. We save as discrete objects (unlike file-based checkpoint)
// so they can be included in queries.
* Orchestrator checkpoint, save the live orchestrator state to the database.
* @param work A map of all 'live' work, Jobs, Reservations, APs, Service instances
* @param processToJob Maps each specific process to the controlling work (job, reservation, ap, service instance)
* @TODO Do we even need processToJob? Can't it be derived from the work map? This question needs to be
* resolved by the owner of the OR. For now we just do it.
public boolean checkpoint(DuccWorkMap work, Map<DuccId, DuccId> processToJob)
throws Exception
String methodName = "checkpoint";
long now = System.currentTimeMillis();
boolean ret = true;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
byte[] bytes = baos.toByteArray();
ByteBuffer workbuf = ByteBuffer.wrap(bytes);
if ( logger.isTrace() ) {
Map<DuccId, IDuccWork> map = work.getMap();
for ( DuccId id : map.keySet() ) {
IDuccWork w = map.get(id);
logger.trace(methodName, id, "Checkpointing", w.getClass());
baos = new ByteArrayOutputStream();
out= new ObjectOutputStream(baos);
bytes = baos.toByteArray();
ByteBuffer mapbuf = ByteBuffer.wrap(bytes);
// Just insert/update the one row of checkpoint data - Jira 4892 - don't truncate as it creates snapshots
DbHandle h =;
h.saveObject(ckptPrepare, 0, workbuf, mapbuf);
long ckptBytes = workbuf.array().length + mapbuf.array().length;
if(ckptBytes > ckptBytesMax) {
ckptBytesMax = ckptBytes;
}, null, "bytes="+ckptBytes+" "+"maxbytes="+ckptBytesMax);
} catch ( Exception e ) {
logger.error(methodName, null, "Cannot save ProcessToJob map", e);
ret = false;
} finally {
if ( ret ) logger.trace(methodName, null, "Saved Orchestrator Checkpoint");
logger.trace(methodName, null, "Total time to save checkpoint:", System.currentTimeMillis() - now);
return ret;
* Orchestrator checkpoint. Restore the checkpoint from the DB. Caller must initialize
* empty maps, which we fill in.
public Pair<DuccWorkMap, Map<DuccId, DuccId>> restore()
throws Exception
String methodName = "restore";
DbHandle h = null;
Pair<DuccWorkMap, Map<DuccId, DuccId>> ret = new Pair<DuccWorkMap, Map<DuccId, DuccId>>();
try {
h =;
String cql = "SELECT * FROM ducc.orckpt WHERE id=0";
ResultSet rs = h.execute(cql);
for ( Row r : rs ) {, null, "Found checkpoint.");
if(r == null) {
ByteBuffer bbWork = r.getBytes("work");
ByteBuffer bbmap = r.getBytes("p2jmap");
byte[] workbytes = bbWork.array();
ByteArrayInputStream bais = new ByteArrayInputStream(workbytes);
ObjectInputStream ois = new ObjectInputStream(bais);
DuccWorkMap work = (DuccWorkMap) ois.readObject();
workbytes = bbmap.array();
bais = new ByteArrayInputStream(workbytes);
ois = new ObjectInputStream(bais);
Map<DuccId, DuccId> processToJob = (Map<DuccId, DuccId>) ois.readObject();
// hack because java serializion is stupid and won't call the no-args constructor - need
// to restore sometransient fields
Set<DuccId> ids = work.getReservationKeySet();
for ( DuccId id : ids ) {
DuccWorkReservation res = (DuccWorkReservation) work.findDuccWork(DuccType.Reservation, ""+id.getFriendly());
if ( r != null ) res.initLogger();
// only gets called once per boot and might be useful, let's leave at info
Map<DuccId, IDuccWork> map = work.getMap();
for ( DuccId id : map.keySet() ) {
IDuccWork w = map.get(id);, id, "Restored", w.getClass());
ret = new Pair<DuccWorkMap, Map<DuccId, DuccId>>(work, processToJob);
} catch ( Exception e ) {
logger.error(methodName, null, "Error restoring checkpoint:", e);
return ret;
// End of OR checkpoint save and restore
// ----------------------------------------------------------------------------------------------------
public void shutdown()
// End of common
// ----------------------------------------------------------------------------------------------------