GORA-651 upgrade Orient DB driver to version 3 (#210)
diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java
index 3ee0d6f..e0b8ca9 100644
--- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java
+++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/query/OrientDBResult.java
@@ -21,9 +21,9 @@
import java.io.IOException;
import java.util.Iterator;
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
+import com.orientechnologies.orient.core.db.ODatabaseSession;
import com.orientechnologies.orient.core.record.impl.ODocument;
-import com.orientechnologies.orient.core.sql.query.OConcurrentResultSet;
+import com.orientechnologies.orient.core.sql.query.OConcurrentLegacyResultSet;
import org.apache.gora.orientdb.store.OrientDBStore;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
@@ -41,7 +41,7 @@
/**
* Reference to the OrientDB Results set.
*/
- private OConcurrentResultSet<ODocument> resultSet;
+ private OConcurrentLegacyResultSet<ODocument> resultSet;
private int size;
private static final Logger log = LoggerFactory.getLogger(OrientDBResult.class);
private Iterator<ODocument> resultSetIterator;
@@ -52,7 +52,7 @@
public OrientDBResult(DataStore<K, T> dataStore,
Query<K, T> query,
- OConcurrentResultSet<ODocument> resultSet) {
+ OConcurrentLegacyResultSet<ODocument> resultSet) {
super(dataStore, query);
this.resultSet = resultSet;
this.size = resultSet.size();
@@ -81,7 +81,7 @@
@Override
protected boolean nextInner() throws IOException {
- ODatabaseDocumentTx loadTx = ((OrientDBStore<K, T>) getDataStore())
+ ODatabaseSession loadTx = ((OrientDBStore<K, T>) getDataStore())
.getConnectionPool().acquire();
loadTx.activateOnCurrentThread();
try {
diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java
index a62e1a4..eee387f 100644
--- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java
+++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStore.java
@@ -36,10 +36,14 @@
import com.github.raymanrt.orientqb.query.Parameter;
import com.gitub.raymanrt.orientqb.delete.Delete;
-import com.orientechnologies.orient.client.remote.OServerAdmin;
+import com.orientechnologies.orient.core.config.OGlobalConfiguration;
+import com.orientechnologies.orient.core.db.ODatabasePool;
+import com.orientechnologies.orient.core.db.ODatabaseSession;
+import com.orientechnologies.orient.core.db.ODatabaseType;
import com.orientechnologies.orient.core.db.OPartitionedDatabasePool;
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePoolFactory;
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
+import com.orientechnologies.orient.core.db.OrientDB;
+import com.orientechnologies.orient.core.db.OrientDBConfig;
+import com.orientechnologies.orient.core.db.OrientDBConfigBuilder;
import com.orientechnologies.orient.core.db.record.OTrackedList;
import com.orientechnologies.orient.core.db.record.OTrackedMap;
import com.orientechnologies.orient.core.db.record.OTrackedSet;
@@ -47,7 +51,7 @@
import com.orientechnologies.orient.core.metadata.schema.OType;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.OCommandSQL;
-import com.orientechnologies.orient.core.sql.query.OConcurrentResultSet;
+import com.orientechnologies.orient.core.sql.query.OConcurrentLegacyResultSet;
import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
@@ -82,10 +86,12 @@
private String ROOT_DATABASE_URL;
private OrientDBStoreParameters orientDbStoreParams;
private OrientDBMapping orientDBMapping;
- private OServerAdmin remoteServerAdmin;
- private OPartitionedDatabasePool connectionPool;
+ private OrientDB remoteServerAdmin;
+ private ODatabasePool connectionPool;
private List<ODocument> docBatch = Collections.synchronizedList(new ArrayList<>());
private ReentrantLock flushLock = new ReentrantLock();
+ private int DEFAULT_DB_POOL_MIN_SIZE = 5;
+ private int DEFAULT_DB_POOL_MAX_SIZE = 10;
/**
* {@inheritDoc}
@@ -103,20 +109,36 @@
ROOT_URL = "remote:".concat(orientDbStoreParams.getServerHost()).concat(":")
.concat(orientDbStoreParams.getServerPort());
ROOT_DATABASE_URL = ROOT_URL.concat("/").concat(orientDbStoreParams.getDatabaseName());
- remoteServerAdmin = new OServerAdmin(ROOT_URL).connect(orientDbStoreParams.getUserName(),
- orientDbStoreParams.getUserPassword());
- if (!remoteServerAdmin.existsDatabase(orientDbStoreParams.getDatabaseName(), "memory")) {
- remoteServerAdmin.createDatabase(orientDbStoreParams.getDatabaseName(), "document", "memory");
+ remoteServerAdmin = new OrientDB(ROOT_URL, orientDbStoreParams.getUserName(),
+ orientDbStoreParams.getUserPassword(), OrientDBConfig.defaultConfig());
+ if (!remoteServerAdmin.exists(orientDbStoreParams.getDatabaseName())) {
+ remoteServerAdmin.create(orientDbStoreParams.getDatabaseName(),
+ ODatabaseType.valueOf(orientDbStoreParams.getStorageType().toUpperCase()));
}
- if (orientDbStoreParams.getConnectionPoolSize() != null) {
- int connPoolSize = Integer.valueOf(orientDbStoreParams.getConnectionPoolSize());
- connectionPool = new OPartitionedDatabasePoolFactory(connPoolSize)
- .get(ROOT_DATABASE_URL, orientDbStoreParams.getUserName(),
- orientDbStoreParams.getUserPassword());
+ if (orientDbStoreParams.getConnectionPoolMinSize() != null &&
+ orientDbStoreParams.getConnectionPoolMaxSize() != null) {
+ OrientDBConfigBuilder poolCfg = OrientDBConfig.builder();
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN,
+ orientDbStoreParams.getConnectionPoolMinSize());
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX,
+ orientDbStoreParams.getConnectionPoolMaxSize());
+
+ connectionPool = new ODatabasePool(remoteServerAdmin,
+ orientDbStoreParams.getDatabaseName(),
+ orientDbStoreParams.getUserName(),
+ orientDbStoreParams.getUserPassword(), poolCfg.build());
} else {
- connectionPool = new OPartitionedDatabasePoolFactory().get(ROOT_DATABASE_URL,
- orientDbStoreParams.getUserName(), orientDbStoreParams.getUserPassword());
+ OrientDBConfigBuilder poolCfg = OrientDBConfig.builder();
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN,
+ DEFAULT_DB_POOL_MIN_SIZE);
+ poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX,
+ DEFAULT_DB_POOL_MAX_SIZE);
+
+ connectionPool = new ODatabasePool(remoteServerAdmin,
+ orientDbStoreParams.getDatabaseName(),
+ orientDbStoreParams.getUserName(),
+ orientDbStoreParams.getUserPassword(), poolCfg.build());
}
OrientDBMappingBuilder<K, T> builder = new OrientDBMappingBuilder<>(this);
@@ -159,7 +181,7 @@
return;
}
- try (ODatabaseDocumentTx schemaTx = connectionPool.acquire()) {
+ try (ODatabaseSession schemaTx = connectionPool.acquire()) {
schemaTx.activateOnCurrentThread();
OClass documentClass = schemaTx.getMetadata().getSchema().createClass(orientDBMapping.getDocumentClass());
@@ -181,7 +203,11 @@
*/
@Override
public void deleteSchema() throws GoraException {
- try (ODatabaseDocumentTx schemaTx = connectionPool.acquire()) {
+ if (!schemaExists()) {
+ return;
+ }
+
+ try (ODatabaseSession schemaTx = connectionPool.acquire()) {
schemaTx.activateOnCurrentThread();
schemaTx.getMetadata().getSchema().dropClass(orientDBMapping.getDocumentClass());
} catch (Exception e) {
@@ -195,7 +221,7 @@
*/
@Override
public boolean schemaExists() throws GoraException {
- try (ODatabaseDocumentTx schemaTx = connectionPool.acquire()) {
+ try (ODatabaseSession schemaTx = connectionPool.acquire()) {
schemaTx.activateOnCurrentThread();
return schemaTx.getMetadata().getSchema()
.existsClass(orientDBMapping.getDocumentClass());
@@ -222,8 +248,8 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put("key", key);
OSQLSynchQuery<ODocument> query = new OSQLSynchQuery<ODocument>(selectQuery.toString());
-
- try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) {
+
+ try (ODatabaseSession selectTx = connectionPool.acquire()) {
selectTx.activateOnCurrentThread();
List<ODocument> result = selectTx.command(query).execute(params);
if (result.size() == 1) {
@@ -247,7 +273,7 @@
dataStoreQuery.setEndKey(key);
dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields());
- try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) {
+ try (ODatabaseSession selectTx = connectionPool.acquire()) {
selectTx.activateOnCurrentThread();
// TODO : further optimize for queries to separate cases update / insert == get rid of select all query
// TODO : for update
@@ -282,7 +308,7 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put("key", key);
OCommandSQL query = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM"));
- try (ODatabaseDocumentTx deleteTx = connectionPool.acquire()) {
+ try (ODatabaseSession deleteTx = connectionPool.acquire()) {
deleteTx.activateOnCurrentThread();
int deleteCount = deleteTx.command(query).execute(params);
if (deleteCount == 1) {
@@ -314,7 +340,7 @@
}
OCommandSQL dbQuery = new OCommandSQL(delete.toString().replace("DELETE", "DELETE FROM"));
- try (ODatabaseDocumentTx deleteTx = connectionPool.acquire()) {
+ try (ODatabaseSession deleteTx = connectionPool.acquire()) {
deleteTx.activateOnCurrentThread();
int deleteCount;
if (params.isEmpty()) {
@@ -337,7 +363,7 @@
dataStoreQuery.setEndKey(query.getEndKey());
dataStoreQuery.populateOrientDBQuery(orientDBMapping, getFieldsToQuery(null), getFields());
- try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) {
+ try (ODatabaseSession selectTx = connectionPool.acquire()) {
selectTx.activateOnCurrentThread();
List<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery())
.execute(dataStoreQuery.getParams());
@@ -373,9 +399,9 @@
dataStoreQuery = (OrientDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery();
}
dataStoreQuery.populateOrientDBQuery(orientDBMapping, fields, getFields());
- try (ODatabaseDocumentTx selectTx = connectionPool.acquire()) {
+ try (ODatabaseSession selectTx = connectionPool.acquire()) {
selectTx.activateOnCurrentThread();
- OConcurrentResultSet<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery())
+ OConcurrentLegacyResultSet<ODocument> result = selectTx.command(dataStoreQuery.getOrientDBQuery())
.execute(dataStoreQuery.getParams());
result.setLimit((int) query.getLimit());
return new OrientDBResult<K, T>(this, query, result);
@@ -414,7 +440,7 @@
*/
@Override
public void flush() throws GoraException {
- try (ODatabaseDocumentTx updateTx = connectionPool.acquire()) {
+ try (ODatabaseSession updateTx = connectionPool.acquire()) {
updateTx.activateOnCurrentThread();
flushLock.lock();
for (ODocument document : docBatch) {
@@ -449,7 +475,7 @@
*
* @return {@link OPartitionedDatabasePool} OrientDB client connection pool.
*/
- public OPartitionedDatabasePool getConnectionPool() {
+ public ODatabasePool getConnectionPool() {
return connectionPool;
}
@@ -663,7 +689,7 @@
}
return map;
} else {
- ODocument doc = new ODocument();
+ ODocument doc = new ODocument("map" + docf);
if (value == null)
return doc;
for (Map.Entry<CharSequence, ?> e : value.entrySet()) {
@@ -915,7 +941,7 @@
private ODocument convertAvroBeanToOrientDoc(final String docf,
final Schema fieldSchema,
final Object value) {
- ODocument record = new ODocument();
+ ODocument record = new ODocument("record" + docf);
for (Schema.Field member : fieldSchema.getFields()) {
Object innerValue = ((PersistentBase) value).get(member.pos());
String innerDoc = orientDBMapping.getDocumentField(member.name());
diff --git a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java
index 8eebcc5..5e67a84 100644
--- a/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java
+++ b/gora-orientdb/src/main/java/org/apache/gora/orientdb/store/OrientDBStoreParameters.java
@@ -31,7 +31,8 @@
public static final String ORIENT_DB_USER_USERNAME = "gora.orientdb.user.username";
public static final String ORIENT_DB_USER_PASSWORD = "gora.orientdb.user.password";
public static final String ORIENT_DB_DB_NAME = "gora.orientdb.database.name";
- public static final String ORIENT_DB_CONNECTION_POOL_SIZE = "gora.orientdb.con.pool.size";
+ public static final String ORIENT_DB_CONNECTION_POOL_MIN_SIZE = "gora.orientdb.con.pool.min.size";
+ public static final String ORIENT_DB_CONNECTION_POOL_MAX_SIZE = "gora.orientdb.con.pool.max.size";
public static final String ORIENT_DB_STORAGE_TYPE = "gora.orientdb.storage.type";
@@ -41,7 +42,8 @@
private String userName;
private String userPassword;
private String databaseName;
- private String connPoolSize;
+ private String connPoolMinSize;
+ private String connPoolMaxSize;
private String storageType;
@@ -100,12 +102,21 @@
}
/**
- * Return remote OrientDB client connections pool size. Eg:- 80
+ * Return remote OrientDB client connections pool min size. Eg:- 80
*
- * @return OrientDB remote server client connections pool size as string.
+ * @return OrientDB remote server client connections pool min size as string.
*/
- public String getConnectionPoolSize() {
- return this.connPoolSize;
+ public String getConnectionPoolMinSize() {
+ return this.connPoolMinSize;
+ }
+
+ /**
+ * Return remote OrientDB client connections pool max size. Eg:- 100
+ *
+ * @return OrientDB remote server client connections pool max size as string.
+ */
+ public String getConnectionPoolMaxSize() {
+ return this.connPoolMaxSize;
}
/**
@@ -123,7 +134,8 @@
String userName,
String userPassword,
String databaseName,
- String connPoolSize,
+ String connPoolMinSize,
+ String connPoolMaxSize,
String storageType) {
this.mappingFile = mappingFile;
this.serverHost = serverHost;
@@ -131,7 +143,8 @@
this.userName = userName;
this.userPassword = userPassword;
this.databaseName = databaseName;
- this.connPoolSize = connPoolSize;
+ this.connPoolMinSize = connPoolMinSize;
+ this.connPoolMaxSize = connPoolMaxSize;
this.storageType = storageType;
}
@@ -149,10 +162,11 @@
String propUserName = properties.getProperty(ORIENT_DB_USER_USERNAME);
String propUserPassword = properties.getProperty(ORIENT_DB_USER_PASSWORD);
String propDatabaseName = properties.getProperty(ORIENT_DB_DB_NAME);
- String propConnPoolSize = properties.getProperty(ORIENT_DB_CONNECTION_POOL_SIZE);
+ String propConnPoolMinSize = properties.getProperty(ORIENT_DB_CONNECTION_POOL_MIN_SIZE);
+ String propConnPoolMaxSize = properties.getProperty(ORIENT_DB_CONNECTION_POOL_MAX_SIZE);
String propStorageType = properties.getProperty(ORIENT_DB_STORAGE_TYPE);
return new OrientDBStoreParameters(propMappingFile,
propServerHost, propServerPort, propUserName,
- propUserPassword, propDatabaseName, propConnPoolSize, propStorageType);
+ propUserPassword, propDatabaseName, propConnPoolMinSize, propConnPoolMaxSize, propStorageType);
}
}
diff --git a/gora-orientdb/src/test/resources/gora.properties b/gora-orientdb/src/test/resources/gora.properties
index 5a24e76..a1038f5 100644
--- a/gora-orientdb/src/test/resources/gora.properties
+++ b/gora-orientdb/src/test/resources/gora.properties
@@ -19,5 +19,6 @@
gora.orientdb.user.username=root
gora.orientdb.user.password=root
gora.orientdb.database.name=gora
-gora.orientdb.con.pool.size=80
+gora.orientdb.con.pool.min.size=10
+gora.orientdb.con.pool.max.size=12
gora.orientdb.storage.type=memory
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6eb492e..27e211d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -881,7 +881,7 @@
<hazelcast.jet.version>3.1</hazelcast.jet.version>
<!-- OrientDB Dependencies -->
- <orientdb.version>2.2.22</orientdb.version>
+ <orientdb.version>3.0.30</orientdb.version>
<orientqb.version>0.2.0</orientqb.version>
<!-- RethinkDB Dependencies -->