ODE-1033: incremented version number by holding DB exclusive lock
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index cf7d194..fabd531 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -101,7 +101,7 @@
*/
private DataSource _inMemDs;
-
+ private static final ThreadLocal<Long> _currentVersion = new ThreadLocal<Long>();
public ProcessStoreImpl() {
this(null, null, "", new OdeConfigProperties(new Properties(), ""), true);
@@ -195,11 +195,12 @@
long version;
if (autoincrementVersion || du.getStaticVersion() == -1) {
// Process and DU use a monotonically increased single version number by default.
- version = exec(new Callable<Long>() {
- public Long call(ConfStoreConnection conn) {
- return conn.getNextVersion();
- }
- });
+ try {
+ version = getCurrentVersion();
+ } finally {
+ //we need to reset the current version thread local value.
+ _currentVersion.set(null);
+ }
} else {
version = du.getStaticVersion();
}
@@ -296,7 +297,6 @@
newDao.setProperty(prop.getKey(), DOMUtils.domToString(prop.getValue()));
}
deployed.add(pc.getProcessId());
- conn.setVersion(pc.getVersion());
} catch (Throwable e) {
String errmsg = "Error persisting deployment record for " + pc.getProcessId()
+ "; process will not be available after restart!";
@@ -583,12 +583,18 @@
}
public long getCurrentVersion() {
+ if (_currentVersion.get() != null){
+ return _currentVersion.get();
+ }
+
long version = exec(new Callable<Long>() {
public Long call(ConfStoreConnection conn) {
return conn.getNextVersion();
}
});
- return version;
+
+ _currentVersion.set(version);
+ return _currentVersion.get();
}
protected void fireEvent(ProcessStoreEvent pse) {
diff --git a/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java b/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
index c8469ae..a7c7099 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
@@ -25,6 +25,8 @@
import org.apache.ode.store.ProcessConfDAO;
import org.hibernate.Criteria;
import org.hibernate.HibernateException;
+import org.hibernate.LockMode;
+import org.hibernate.Query;
import org.hibernate.Session;
import javax.xml.namespace.QName;
@@ -77,21 +79,32 @@
}
public long getNextVersion() {
- VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl)
- _session.createQuery("from VersionTrackerDAOImpl v ").uniqueResult();
- if (vt == null) return 1;
- else return vt.getVersion() + 1;
+ Query q = _session.createQuery("from VersionTrackerDAOImpl v ");
+ q.setLockMode("v", LockMode.UPGRADE);
+ VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) q.uniqueResult();
+ if (vt == null) {
+ vt = new VersionTrackerDAOImpl();
+ vt.setVersion(1);
+ }else {
+ vt.setVersion(vt.getVersion() + 1);
+ }
+ _session.save(vt);
+ return vt.getVersion();
}
public void setVersion(long version) {
_session.flush();
- VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl)
- _session.createQuery("from VersionTrackerDAOImpl v ").uniqueResult();
+
+ Query q = _session.createQuery("from VersionTrackerDAOImpl v ");
+ q.setLockMode("v", LockMode.UPGRADE);
+ VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) q.uniqueResult();
if (vt == null) {
vt = new VersionTrackerDAOImpl();
vt.setId(1);
+ vt.setVersion(1);
+ } else {
+ vt.setVersion(version);
}
- vt.setVersion(version);
_session.save(vt);
}
diff --git a/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java b/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
index fa2f4a7..72a9df9 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
@@ -25,6 +25,7 @@
import org.apache.ode.store.DeploymentUnitDAO;
import javax.persistence.EntityManager;
+import javax.persistence.Query;
import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -64,19 +65,39 @@
}
public long getNextVersion() {
- List<VersionTrackerDAOImpl> res = _em.createQuery("select v from VersionTrackerDAOImpl v").getResultList();
- if (res.size() == 0) return 1;
- else {
- VersionTrackerDAOImpl vt = res.get(0);
- return vt.getVersion() + 1;
- }
+ VersionTrackerDAOImpl vt = null;
+ Query query = _em.createQuery("select v from VersionTrackerDAOImpl v");
+ query.setHint("openjpa.FetchPlan.ReadLockMode", "WRITE");
+
+ List<VersionTrackerDAOImpl> res = query.getResultList();
+
+ if(!res.isEmpty())
+ vt = res.get(0);
+
+ if (vt == null) {
+ vt = new VersionTrackerDAOImpl();
+ vt.setVersion(1);
+ } else {
+ vt.setVersion(vt.getVersion() + 1);
+ }
+
+ _em.persist(vt);
+ return vt.getVersion();
}
public void setVersion(long version) {
- List<VersionTrackerDAOImpl> res = _em.createQuery("select v from VersionTrackerDAOImpl v").getResultList();
- VersionTrackerDAOImpl vt;
- if (res.size() == 0) vt = new VersionTrackerDAOImpl();
- else vt = res.get(0);
+ VersionTrackerDAOImpl vt = null;
+ Query query = _em.createQuery("select v from VersionTrackerDAOImpl v");
+ query.setHint("openjpa.FetchPlan.ReadLockMode", "WRITE");
+
+ List<VersionTrackerDAOImpl> res = query.getResultList();
+
+ if(!res.isEmpty())
+ vt = res.get(0);
+
+ if (vt == null)
+ vt = new VersionTrackerDAOImpl();
+
vt.setVersion(version);
_em.persist(vt);
}
diff --git a/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java b/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
index bd8508f..407e18a 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
@@ -60,6 +60,7 @@
propMap.put("openjpa.ConnectionFactoryMode", "managed");
propMap.put("openjpa.FlushBeforeQueries", "false");
propMap.put("openjpa.FetchBatchSize", 1000);
+ propMap.put("openjpa.LockManager","pessimistic");
//dirty hack for ODE-1015
String skipIsolation = System.getProperty("openjpa.connection.isolation.skip", "N");