blob: b255e0849dbfa147242f0fc35dd4cd579d2ca57a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.scheduler.simple;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.math.BigDecimal;
import javax.sql.DataSource;
import javax.xml.namespace.QName;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.utils.DbIsolation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.utils.DbIsolation;
import org.apache.ode.utils.StreamUtils;
/**
* JDBC-based implementation of the {@link DatabaseDelegate} interface. Should work with most
* reasonably behaved databases.
*
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*/
public class JdbcDelegate implements DatabaseDelegate {
private static final Logger __log = LoggerFactory.getLogger(JdbcDelegate.class);
private static final String DELETE_JOB = "delete from ODE_JOB where jobid = ? and nodeid = ?";
private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, retryCount = ? where jobid = ?";
private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and mod(ts,?) = ? and ts < ?";
private static final String UPGRADE_JOB_DB2 = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and mod(ts,CAST(? AS BIGINT)) = ? and ts < ?";
private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and (ts % ?) = ? and ts < ?";
private static final String UPGRADE_JOB_SYBASE = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and convert(int, ts) % ? = ? and ts < ?";
private static final String UPGRADE_JOB_SYBASE12 = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and -1 <> ? and -1 <> ? and ts < ?";
private static final String SAVE_JOB = "insert into ODE_JOB "
+ " (jobid, nodeid, ts, scheduled, transacted, "
+ "instanceId,"
+ "mexId,"
+ "processId,"
+ "type,"
+ "channel,"
+ "correlatorId,"
+ "correlationKeySet,"
+ "retryCount,"
+ "inMem,"
+ "detailsExt"
+ ") values(?, ?, ?, ?, ?,"
+ "?,"
+ "?,"
+ "?,"
+ "?,"
+ "?,"
+ "?,"
+ "?,"
+ "?,"
+ "?,"
+ "?"
+ ")";
private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, "
+ "instanceId,"
+ "mexId,"
+ "processId,"
+ "type,"
+ "channel,"
+ "correlatorId,"
+ "correlationKeySet,"
+ "retryCount,"
+ "inMem,"
+ "detailsExt"
+ " from ODE_JOB "
+ "where nodeid = ? and ts < ? order by ts";
// public Long instanceId;
// public String mexId;
// public String processId;
// public String type;
// public String channel;
// public String correlatorId;
// public String correlationKeySet;
// public Integer retryCount;
// public Boolean inMem;
// public Map<String, Object> detailsExt = new HashMap<String, Object>();
private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private static final int UPDATE_SCHEDULED_SLOTS = 10;
private DataSource _ds;
private Dialect _dialect;
public JdbcDelegate(DataSource ds) {
_ds = ds;
_dialect = guessDialect();
}
public boolean deleteJob(String jobid, String nodeId) throws DatabaseException {
if (__log.isDebugEnabled())
__log.debug("deleteJob " + jobid + " on node " + nodeId);
Connection con = null;
PreparedStatement ps = null;
try {
con = getConnection();
ps = con.prepareStatement(DELETE_JOB);
ps.setString(1, jobid);
ps.setString(2, nodeId);
return ps.executeUpdate() == 1;
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
}
public List<String> getNodeIds() throws DatabaseException {
Connection con = null;
PreparedStatement ps = null;
try {
con = getConnection();
ps = con.prepareStatement(GET_NODEIDS, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = ps.executeQuery();
ArrayList<String> nodes = new ArrayList<String>();
while (rs.next()) {
String nodeId = rs.getString(1);
if (nodeId != null)
nodes.add(rs.getString(1));
}
if (__log.isDebugEnabled())
__log.debug("getNodeIds: " + nodes);
return nodes;
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
}
public boolean insertJob(Job job, String nodeId, boolean loaded) throws DatabaseException {
if (__log.isDebugEnabled())
__log.debug("insertJob " + job.jobId + " on node " + nodeId + " loaded=" + loaded);
Connection con = null;
PreparedStatement ps = null;
try {
int i = 1;
con = getConnection();
ps = con.prepareStatement(SAVE_JOB);
ps.setString(i++, job.jobId);
ps.setString(i++, nodeId);
ps.setLong(i++, job.schedDate);
ps.setInt(i++, asInteger(loaded));
ps.setInt(i++, asInteger(job.transacted));
JobDetails details = job.detail;
ps.setObject(i++, details.instanceId, Types.BIGINT);
ps.setObject(i++, details.mexId, Types.VARCHAR);
ps.setObject(i++, details.processId, Types.VARCHAR);
ps.setObject(i++, details.type, Types.VARCHAR);
ps.setObject(i++, details.channel, Types.VARCHAR);
ps.setObject(i++, details.correlatorId, Types.VARCHAR);
ps.setObject(i++, details.correlationKeySet, Types.VARCHAR);
ps.setObject(i++, details.retryCount, Types.INTEGER);
ps.setObject(i++, details.inMem, Types.INTEGER);
if (details.detailsExt == null || details.detailsExt.size() == 0) {
ps.setObject(i++, null, Types.BLOB);
} else {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
StreamUtils.write(bos, (Serializable) details.detailsExt);
} catch (Exception ex) {
__log.error("Error serializing job detail: " + job.detail);
throw new DatabaseException(ex);
}
ps.setBytes(i++, bos.toByteArray());
}
return ps.executeUpdate() == 1;
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
}
public boolean updateJob(Job job) throws DatabaseException {
if (__log.isDebugEnabled())
__log.debug("updateJob " + job.jobId + " retryCount=" + job.detail.getRetryCount());
Connection con = null;
PreparedStatement ps = null;
try {
con = getConnection();
ps = con.prepareStatement(UPDATE_JOB);
ps.setLong(1, job.schedDate);
ps.setInt(2, job.detail.getRetryCount());
ps.setString(3, job.jobId);
return ps.executeUpdate() == 1;
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
}
private Long asLong(Object o) {
if (o == null) return null;
else if (o instanceof BigDecimal) return ((BigDecimal) o).longValue();
else if (o instanceof Long) return (Long) o;
else if (o instanceof Integer) return ((Integer) o).longValue();
else throw new IllegalStateException("Can't convert to long " + o.getClass());
}
private Integer asInteger(Object o) {
if (o == null) return null;
else if (o instanceof BigDecimal) return ((BigDecimal) o).intValue();
else if (o instanceof Integer) return (Integer) o;
else throw new IllegalStateException("Can't convert to integer " + o.getClass());
}
@SuppressWarnings("unchecked")
public List<Job> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException {
ArrayList<Job> ret = new ArrayList<Job>(maxjobs);
Connection con = null;
PreparedStatement ps = null;
try {
con = getConnection();
ps = con.prepareStatement(SCHEDULE_IMMEDIATE);
ps.setString(1, nodeId);
ps.setLong(2, maxtime);
ps.setMaxRows(maxjobs);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
Scheduler.JobDetails details = new Scheduler.JobDetails();
details.instanceId = asLong(rs.getObject("instanceId"));
details.mexId = (String) rs.getObject("mexId");
details.processId = (String) rs.getObject("processId");
details.type = (String) rs.getObject("type");
details.channel = (String) rs.getObject("channel");
details.correlatorId = (String) rs.getObject("correlatorId");
details.correlationKeySet = (String) rs.getObject("correlationKeySet");
details.retryCount = asInteger(rs.getObject("retryCount"));
details.inMem = asBoolean(rs.getInt("inMem"));
if (rs.getObject("detailsExt") != null) {
try {
ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream("detailsExt"));
details.detailsExt = (Map<String, Object>) is.readObject();
is.close();
} catch (Exception e) {
throw new DatabaseException("Error deserializing job detailsExt", e);
}
}
{
//For compatibility reasons, we check whether there are entries inside
//jobDetailsExt blob, which correspond to extracted entries. If so, we
//use them.
Map<String, Object> detailsExt = details.getDetailsExt();
if (detailsExt.get("type") != null) {
details.type = (String) detailsExt.get("type");
}
if (detailsExt.get("iid") != null) {
details.instanceId = (Long) detailsExt.get("iid");
}
if (detailsExt.get("pid") != null && detailsExt.get("pid") instanceof String) {
details.processId = (String) detailsExt.get("pid");
}
if (detailsExt.get("inmem") != null) {
details.inMem = (Boolean) detailsExt.get("inmem");
}
if (detailsExt.get("ckey") != null) {
details.correlationKeySet = (String) detailsExt.get("ckey");
}
if (detailsExt.get("channel") != null) {
details.channel = (String) detailsExt.get("channel");
}
if (detailsExt.get("mexid") != null) {
details.mexId = (String) detailsExt.get("mexid");
}
if (detailsExt.get("correlatorId") != null) {
details.correlatorId = (String) detailsExt.get("correlatorId");
}
if (detailsExt.get("retryCount") != null) {
details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount"));
}
}
Job job = new Job(rs.getLong("ts"), rs.getString("jobid"), asBoolean(rs.getInt("transacted")), details);
ret.add(job);
}
rs.close();
ps.close();
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
return ret;
}
public int updateReassign(String oldnode, String newnode) throws DatabaseException {
if (__log.isDebugEnabled())
__log.debug("updateReassign from " + oldnode + " ---> " + newnode);
Connection con = null;
PreparedStatement ps = null;
try {
con = getConnection();
ps = con.prepareStatement(UPDATE_REASSIGN);
ps.setString(1, newnode);
ps.setString(2, oldnode);
return ps.executeUpdate();
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
}
public int updateAssignToNode(String node, int i, int numNodes, long maxtime) throws DatabaseException {
if (__log.isDebugEnabled())
__log.debug("updateAsssignToNode node=" + node + " " + i + "/" + numNodes + " maxtime=" + maxtime);
Connection con = null;
PreparedStatement ps = null;
try {
con = getConnection();
if (_dialect == Dialect.SQLSERVER) {
ps = con.prepareStatement(UPGRADE_JOB_SQLSERVER);
} else if (_dialect == Dialect.DB2) {
ps = con.prepareStatement(UPGRADE_JOB_DB2);
} else if (_dialect == Dialect.SYBASE) {
ps = con.prepareStatement(UPGRADE_JOB_SYBASE);
} else if (_dialect == Dialect.SYBASE12) {
ps = con.prepareStatement(UPGRADE_JOB_SYBASE12);
} else {
ps = con.prepareStatement(UPGRADE_JOB_DEFAULT);
}
ps.setString(1, node);
ps.setInt(2, numNodes);
ps.setInt(3, i);
ps.setLong(4, maxtime);
return ps.executeUpdate();
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
close(ps);
close(con);
}
}
private Connection getConnection() throws SQLException {
Connection c = _ds.getConnection();
DbIsolation.setIsolationLevel(c);
return c;
}
private int asInteger(boolean value) {
return (value ? 1 : 0);
}
private boolean asBoolean(int value) {
return (value != 0);
}
private void close(PreparedStatement ps) {
if (ps != null) {
try {
ps.close();
} catch (Exception e) {
__log.warn("Exception while closing prepared statement", e);
}
}
}
private void close(Connection con) {
if (con != null) {
try {
con.close();
} catch (Exception e) {
__log.warn("Exception while closing connection", e);
}
}
}
private Dialect guessDialect() {
Dialect d = Dialect.UNKNOWN;
Connection con = null;
try {
con = getConnection();
DatabaseMetaData metaData = con.getMetaData();
if (metaData != null) {
String dbProductName = metaData.getDatabaseProductName();
int dbMajorVer = metaData.getDatabaseMajorVersion();
__log.info("Using database " + dbProductName + " major version " + dbMajorVer);
if (dbProductName.indexOf("DB2") >= 0) {
d = Dialect.DB2;
} else if (dbProductName.indexOf("Derby") >= 0) {
d = Dialect.DERBY;
} else if (dbProductName.indexOf("Firebird") >= 0) {
d = Dialect.FIREBIRD;
} else if (dbProductName.indexOf("HSQL") >= 0) {
d = Dialect.HSQL;
} else if (dbProductName.indexOf("H2") >= 0) {
d = Dialect.H2;
} else if (dbProductName.indexOf("Microsoft SQL") >= 0) {
d = Dialect.SQLSERVER;
} else if (dbProductName.indexOf("MySQL") >= 0) {
d = Dialect.MYSQL;
} else if (dbProductName.indexOf("Sybase") >= 0 || dbProductName.indexOf("Adaptive") >= 0) {
d = Dialect.SYBASE;
if( dbMajorVer == 12 ) {
d = Dialect.SYBASE12;
}
}
}
} catch (SQLException e) {
__log.warn("Unable to determine database dialect", e);
} finally {
close(con);
}
__log.info("Using database dialect: " + d);
return d;
}
public void acquireTransactionLocks() {
Statement s = null;
Connection c = null;
try {
c = getConnection();
s = c.createStatement();
s.execute("update ODE_JOB set jobid = '' where 1 = 0");
} catch (Exception e) {
throw new RuntimeException("", e);
} finally {
try {
if (s != null) s.close();
if (c != null) c.close();
} catch (Exception e) {
throw new RuntimeException("", e);
}
}
}
public void deleteAllJobs() {
Statement s = null;
Connection c = null;
try {
c = getConnection();
s = c.createStatement();
s.execute("delete from ODE_JOB");
} catch (Exception e) {
throw new RuntimeException("", e);
} finally {
try {
if (s != null) s.close();
if (c != null) c.close();
} catch (Exception e) {
throw new RuntimeException("", e);
}
}
}
enum Dialect {
DB2, DERBY, FIREBIRD, HSQL, MYSQL, ORACLE, SQLSERVER, SYBASE, SYBASE12, H2, UNKNOWN
}
}