blob: 128514ddfdb1c2d9370304e36ab40601195d7c62 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.marmotta.kiwi.persistence;
import com.google.common.base.Preconditions;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.primitives.Longs;
import info.aduna.iteration.*;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.marmotta.commons.sesame.model.LiteralCommons;
import org.apache.marmotta.commons.sesame.model.Namespaces;
import org.apache.marmotta.commons.sesame.tripletable.TripleTable;
import org.apache.marmotta.kiwi.caching.CacheManager;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
import org.apache.marmotta.kiwi.exception.ResultInterruptedException;
import org.apache.marmotta.kiwi.model.rdf.*;
import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration;
import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.openrdf.model.Literal;
import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
import org.openrdf.repository.RepositoryException;
import org.openrdf.repository.RepositoryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.concurrent.locks.ReentrantLock;
/**
* A KiWiConnection offers methods for storing and retrieving KiWiTriples, KiWiNodes, and KiWiNamespaces in the
* database. It wraps a JDBC connection which will be committed on commit(), rolled back on rollback() and
* closed on close();
* <p/>
* Author: Sebastian Schaffert
*/
public class KiWiConnection implements AutoCloseable {
private static Logger log = LoggerFactory.getLogger(KiWiConnection.class);
protected KiWiDialect dialect;
protected Connection connection;
protected KiWiPersistence persistence;
protected CacheManager cacheManager;
protected TripleTable<KiWiTriple> tripleBatch;
/**
* Cache nodes by database ID
*/
private Map<Long,KiWiNode> nodeCache;
/**
* Cache triples by database ID
*/
private Map<Long,KiWiTriple> tripleCache;
/**
* Cache URI resources by uri
*/
private Map<String,KiWiUriResource> uriCache;
/**
* Cache BNodes by BNode ID
*/
private Map<String,KiWiAnonResource> bnodeCache;
/**
* Cache literals by literal cache key (LiteralCommons#createCacheKey(String,Locale,URI))
*/
private Map<String,KiWiLiteral> literalCache;
/**
* Look up namespaces by URI
*/
private Map<String,KiWiNamespace> namespaceUriCache;
/**
* Look up namespaces by prefix
*/
private Map<String,KiWiNamespace> namespacePrefixCache;
/**
* Cache instances of locales for language tags
*/
private static Map<String,Locale> localeMap = new HashMap<String, Locale>();
private static Calendar calendarUTC = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
private Map<String,PreparedStatement> statementCache;
private boolean autoCommit = false;
private boolean batchCommit = true;
private boolean closed = false;
private int batchSize = 1000;
private ReentrantLock commitLock;
private ReentrantLock literalLock;
private ReentrantLock uriLock;
private ReentrantLock bnodeLock;
// this set keeps track of all statements that have been deleted in the active transaction of this connection
// this is needed to be able to determine if adding the triple again will merely undo a deletion or is a
// completely new addition to the triple store
private BloomFilter<Long> deletedStatementsLog;
private static long numberOfCommits = 0;
private long transactionId;
private int QUERY_BATCH_SIZE = 1024;
public KiWiConnection(KiWiPersistence persistence, KiWiDialect dialect, CacheManager cacheManager) throws SQLException {
this.cacheManager = cacheManager;
this.dialect = dialect;
this.persistence = persistence;
this.commitLock = new ReentrantLock();
this.literalLock = new ReentrantLock();
this.uriLock = new ReentrantLock();
this.bnodeLock = new ReentrantLock();
this.batchCommit = dialect.isBatchSupported();
this.deletedStatementsLog = BloomFilter.create(Funnels.longFunnel(), 100000);
this.transactionId = getNextSequence();
initCachePool();
initStatementCache();
}
private void initCachePool() {
nodeCache = cacheManager.getNodeCache();
tripleCache = cacheManager.getTripleCache();
uriCache = cacheManager.getUriCache();
bnodeCache = cacheManager.getBNodeCache();
literalCache = cacheManager.getLiteralCache();
namespacePrefixCache = cacheManager.getNamespacePrefixCache();
namespaceUriCache = cacheManager.getNamespaceUriCache();
}
/**
* Load all prepared statements of the dialect into the statement cache
* @throws SQLException
*/
private void initStatementCache() throws SQLException {
statementCache = new HashMap<String, PreparedStatement>();
/*
for(String key : dialect.getStatementIdentifiers()) {
statementCache.put(key,connection.prepareStatement(dialect.getStatement(key)));
}
*/
}
/**
* This method must be called by all methods as soon as they actually require a JDBC connection. This allows
* more efficient implementations in case the queries can be answered directly from the cache.
*/
protected void requireJDBCConnection() throws SQLException {
if(connection == null) {
connection = persistence.getJDBCConnection();
connection.setAutoCommit(autoCommit);
}
if(tripleBatch == null) {
tripleBatch = new TripleTable<KiWiTriple>();
}
}
/**
* Get direct access to the JDBC connection used by this KiWiConnection.
*
* @return
*/
public Connection getJDBCConnection() throws SQLException {
requireJDBCConnection();
return connection;
}
/**
* Return the cache manager used by this connection
* @return
*/
public CacheManager getCacheManager() {
return cacheManager;
}
public KiWiDialect getDialect() {
return dialect;
}
public KiWiConfiguration getConfiguration() {
return persistence.getConfiguration();
}
/**
* Load a KiWiNamespace with the given prefix, or null if the namespace does not exist. The method will first
* look in the node cache for cached nodes. If no cache entry is found, it will run a database query
* ("load.namespace_prefix").
*
* @param prefix the prefix to look for
* @return the KiWiNamespace with this prefix or null if it does not exist
* @throws SQLException
*/
public KiWiNamespace loadNamespaceByPrefix(String prefix) throws SQLException {
KiWiNamespace element = namespacePrefixCache.get(prefix);
if(element != null) {
return element;
}
requireJDBCConnection();
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.namespace_prefix");
query.setString(1, prefix);
query.setMaxRows(1);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return constructNamespaceFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
}
/**
* Load a KiWiNamespace with the given uri, or null if the namespace does not exist. The method will first
* look in the node cache for cached nodes. If no cache entry is found, it will run a database query
* ("load.namespace_prefix").
*
* @param uri the uri to look for
* @return the KiWiNamespace with this uri or null if it does not exist
* @throws SQLException
*/
public KiWiNamespace loadNamespaceByUri(String uri) throws SQLException {
KiWiNamespace element = namespaceUriCache.get(uri);
if(element != null) {
return element;
}
requireJDBCConnection();
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.namespace_uri");
query.setString(1, uri);
query.setMaxRows(1);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return constructNamespaceFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
}
/**
* Store the namespace passed as argument in the database. The database might enfore unique constraints and
* thus throw an exception in case the prefix or URI is already used.
*
* @param namespace the namespace to store
* @throws SQLException the prefix or URI is already used, or a database error occurred
*/
public void storeNamespace(KiWiNamespace namespace) throws SQLException {
// TODO: add unique constraints to table
if(namespace.getId() >= 0) {
log.warn("trying to store namespace which is already persisted: {}",namespace);
return;
}
requireJDBCConnection();
namespace.setId(getNextSequence());
PreparedStatement insertNamespace = getPreparedStatement("store.namespace");
insertNamespace.setLong(1,namespace.getId());
insertNamespace.setString(2,namespace.getPrefix());
insertNamespace.setString(3,namespace.getUri());
insertNamespace.setTimestamp(4,new Timestamp(namespace.getCreated().getTime()));
insertNamespace.executeUpdate();
namespacePrefixCache.put(namespace.getPrefix(), namespace);
namespaceUriCache.put(namespace.getUri(),namespace);
}
/**
* Delete the namespace passed as argument from the database and from the caches.
* @param namespace the namespace to delete
* @throws SQLException in case a database error occurred
*/
public void deleteNamespace(KiWiNamespace namespace) throws SQLException {
if(namespace.getId() < 0) {
log.warn("trying to remove namespace which is not persisted: {}",namespace);
return;
}
requireJDBCConnection();
PreparedStatement deleteNamespace = getPreparedStatement("delete.namespace");
deleteNamespace.setLong(1, namespace.getId());
deleteNamespace.executeUpdate();
namespacePrefixCache.remove(namespace.getPrefix());
namespaceUriCache.remove(namespace.getUri());
}
/**
* Count all non-deleted triples in the triple store
* @return
* @throws SQLException
*/
public long getSize() throws SQLException {
requireJDBCConnection();
PreparedStatement querySize = getPreparedStatement("query.size");
ResultSet result = querySize.executeQuery();
try {
if(result.next()) {
return result.getLong(1) + (tripleBatch != null ? tripleBatch.size() : 0);
} else {
return 0 + (tripleBatch != null ? tripleBatch.size() : 0);
}
} finally {
result.close();
}
}
/**
* Count all non-deleted triples in the triple store
* @return
* @throws SQLException
*/
public long getSize(KiWiResource context) throws SQLException {
if(context.getId() < 0) {
return 0;
};
requireJDBCConnection();
PreparedStatement querySize = getPreparedStatement("query.size_ctx");
querySize.setLong(1,context.getId());
ResultSet result = querySize.executeQuery();
try {
if(result.next()) {
return result.getLong(1) + (tripleBatch != null ? tripleBatch.listTriples(null,null,null,context, false).size() : 0);
} else {
return 0 + (tripleBatch != null ? tripleBatch.listTriples(null,null,null,context, false).size() : 0);
}
} finally {
result.close();
}
}
/**
* Load a KiWiNode by database ID. The method will first look in the node cache for cached nodes. If
* no cache entry is found, it will run a database query ('load.node_by_id') on the NODES table and
* construct an appropriate subclass instance of KiWiNode with the obtained values. The result will be
* constructed based on the value of the NTYPE column as follows:
* <ul>
* <li>'uri' - KiWiUriResource using the id and svalue (as URI) columns</li>
* <li>'bnode' - KiWiAnonResource using the id and svalue (as AnonId) columns</li>
* <li>'string' - KiWiStringLiteral using the id, svalue (literal value), lang (literal
* language) and ltype (literal type) columns</li>
* <li>'int' - KiWiIntLiteral using the id, svalue (string value), ivalue (integer value)
* and ltype (literal type) columns</li>
* <li>'double' - KiWiDoubleLiteral using the id, svalue (string value), dvalue (double
* value) and ltype (literal type) columns</li>
* <li>'boolean' - KiWiBooleanLiteral using the id, svalue (string value), bvalue (boolean
* value) and ltype (literal type) columns</li>
* <li>'date' - KiWiDateLiteral using the id, svalue (string value), tvalue (time value)
* and ltype (literal type) columns</li>
* </ul>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param id the database id of the node to load
* @return an instance of a KiWiNode subclass representing the node with the given database id;
* type depends on value of the ntype column
*/
public KiWiNode loadNodeById(long id) throws SQLException {
// look in cache
KiWiNode element = nodeCache.get(id);
if(element != null) {
return element;
}
requireJDBCConnection();
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.node_by_id");
synchronized (query) {
query.setLong(1,id);
query.setMaxRows(1);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
}
}
/**
* Batch load the nodes with the given ids. This method aims to offer performance improvements by reducing
* database roundtrips.
* @param ids array of ids to retrieve
* @return array of nodes corresponding to these ids (in the same order)
* @throws SQLException
*/
public KiWiNode[] loadNodesByIds(long... ids) throws SQLException {
requireJDBCConnection();
KiWiNode[] result = new KiWiNode[ids.length];
// first look in the cache for any ids that have already been loaded
ArrayList<Long> toFetch = new ArrayList<>(ids.length);
for(int i=0; i < ids.length; i++) {
if(ids[i] != 0) {
result[i] = nodeCache.get(ids[i]);
if(result[i] == null) {
toFetch.add(ids[i]);
}
}
}
if(toFetch.size() > 0) {
// declare variables before to optimize stack allocation
int position = 0;
int nextBatchSize;
PreparedStatement query;
KiWiNode node;
while(position < toFetch.size()) {
nextBatchSize = computeBatchSize(position, toFetch.size());
query = getPreparedStatement("load.nodes_by_ids", nextBatchSize);
synchronized (query) {
for(int i=0; i<nextBatchSize; i++) {
query.setLong(i+1, toFetch.get(position + i));
}
query.setMaxRows(nextBatchSize);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet rows = query.executeQuery();
try {
while(rows.next()) {
node = constructNodeFromDatabase(rows);
for(int i=0; i<ids.length; i++) {
if(ids[i] == node.getId()) {
result[i] = node;
}
}
}
} finally {
rows.close();
}
position += nextBatchSize;
}
}
}
return result;
}
private int computeBatchSize(int position, int length) {
int batchSize = QUERY_BATCH_SIZE;
while(length - position < batchSize) {
batchSize = batchSize >> 1;
}
return batchSize;
}
public KiWiTriple loadTripleById(long id) throws SQLException {
// look in cache
KiWiTriple element = tripleCache.get(id);
if(element != null) {
return element;
}
requireJDBCConnection();
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.triple_by_id");
query.setLong(1,id);
query.setMaxRows(1);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return constructTripleFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
}
/**
* Load a KiWiUriResource by URI. The method will first look in the node cache for cached nodes. If
* no cache entry is found, it will run a database query ('load.uri_by_uri') on the NODES table and
* construct a new KiWiUriResource using the values of the id and svalue columns.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param uri the URI of the resource to load
* @return the KiWiUriResource identified by the given URI or null if it does not exist
*/
public KiWiUriResource loadUriResource(String uri) throws SQLException {
Preconditions.checkNotNull(uri);
// look in cache
KiWiUriResource element = uriCache.get(uri);
if(element != null) {
return element;
}
requireJDBCConnection();
uriLock.lock();
try {
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.uri_by_uri");
query.setString(1, uri);
query.setMaxRows(1);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return (KiWiUriResource)constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
} finally {
uriLock.unlock();
}
}
/**
* Load a KiWiAnonResource by anonymous ID. The method will first look in the node cache for
* cached nodes. If no cache entry is found, it will run a database query ('load.bnode_by_anonid')
* on the NODES table and construct a new KiWiAnonResource using the values of the id and
* svalue columns.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param id the anonymous ID of the resource to load
* @return the KiWiAnonResource identified by the given internal ID or null if it does not exist
*/
public KiWiAnonResource loadAnonResource(String id) throws SQLException {
// look in cache
KiWiAnonResource element = bnodeCache.get(id);
if(element != null) {
return element;
}
requireJDBCConnection();
bnodeLock.lock();
try {
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.bnode_by_anonid");
query.setString(1,id);
query.setMaxRows(1);
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return (KiWiAnonResource)constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
} finally {
bnodeLock.unlock();
}
}
/**
* Load a literal based on the value, language and type passed as argument. The method will first look in the node cache for
* cached nodes. If no cache entry is found, it will run a database query ("load.literal_by_v")
* on the NODES table and construct a new KiWiLiteral using the values of the literal columns (svalue, ivalue, ...). The
* type of literal returned depends on the value of the ntype column.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param value string value of the literal to load
* @param lang language of the literal to load (optional, 2-letter language code with optional country)
* @param ltype the type of the literal to load (optional)
* @return the literal matching the given arguments or null if it does not exist
* @throws SQLException
*/
public KiWiLiteral loadLiteral(String value, String lang, KiWiUriResource ltype) throws SQLException {
// look in cache
final KiWiLiteral element = literalCache.get(LiteralCommons.createCacheKey(value,getLocale(lang), ltype));
if(element != null) {
return element;
}
requireJDBCConnection();
// ltype not persisted
if(ltype != null && ltype.getId() < 0) {
return null;
}
literalLock.lock();
try {
// otherwise prepare a query, depending on the parameters given
final PreparedStatement query;
if(lang == null && ltype == null) {
query = getPreparedStatement("load.literal_by_v");
query.setString(1,value);
} else if(lang != null) {
query = getPreparedStatement("load.literal_by_vl");
query.setString(1,value);
query.setString(2, lang);
} else if(ltype != null) {
query = getPreparedStatement("load.literal_by_vt");
query.setString(1,value);
query.setLong(2,ltype.getId());
} else {
// This cannot happen...
throw new IllegalArgumentException("Impossible combination of lang/type in loadLiteral!");
}
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return (KiWiLiteral)constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
} finally {
literalLock.unlock();
}
}
/**
* Load a literal with the date value given as argument if it exists. The method will first look in
* the node cache for cached nodes. If no cache entry is found, it will run a database query ("load.literal_by_tv")
* on the NODES table and construct a new KiWiLiteral using the values of the literal columns
* (svalue, ivalue, ...). The type of literal returned depends on the value of the ntype column.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param date the date of the date literal to load
* @return a KiWiDateLiteral with the correct date, or null if it does not exist
* @throws SQLException
*/
public KiWiDateLiteral loadLiteral(DateTime date) throws SQLException {
// look in cache
KiWiLiteral element = literalCache.get(LiteralCommons.createCacheKey(date.withMillisOfSecond(0),Namespaces.NS_XSD + "dateTime"));
if(element != null && element instanceof KiWiDateLiteral) {
return (KiWiDateLiteral)element;
}
requireJDBCConnection();
KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "dateTime");
if(ltype == null || ltype.getId() < 0) {
return null;
}
literalLock.lock();
try {
// otherwise prepare a query, depending on the parameters given
PreparedStatement query = getPreparedStatement("load.literal_by_tv");
query.setTimestamp(1, new Timestamp(date.getMillis()), calendarUTC);
query.setInt(2, date.getZone().getOffset(date)/1000);
query.setLong(3,ltype.getId());
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return (KiWiDateLiteral)constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
} finally {
literalLock.unlock();
}
}
/**
* Load a integer literal with the long value given as argument if it exists. The method will first look in
* the node cache for cached nodes. If no cache entry is found, it will run a database query ("load.literal_by_iv")
* on the NODES table and construct a new KiWiLiteral using the values of the literal columns
* (svalue, ivalue, ...). The type of literal returned depends on the value of the ntype column.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param value the value of the integer literal to load
* @return a KiWiIntLiteral with the correct value, or null if it does not exist
* @throws SQLException
*/
public KiWiIntLiteral loadLiteral(long value) throws SQLException {
// look in cache
KiWiLiteral element = literalCache.get(LiteralCommons.createCacheKey(Long.toString(value),(String)null,Namespaces.NS_XSD + "integer"));
if(element != null && element instanceof KiWiIntLiteral) {
return (KiWiIntLiteral)element;
}
requireJDBCConnection();
KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "integer");
// ltype not persisted
if(ltype == null || ltype.getId() < 0) {
return null;
}
literalLock.lock();
try {
// otherwise prepare a query, depending on the parameters given
PreparedStatement query = getPreparedStatement("load.literal_by_iv");
query.setLong(1,value);
query.setLong(2,ltype.getId());
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return (KiWiIntLiteral)constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
} finally {
literalLock.unlock();
}
}
/**
* Load a double literal with the double value given as argument if it exists. The method will first look in
* the node cache for cached nodes. If no cache entry is found, it will run a database query ("load.literal_by_dv")
* on the NODES table and construct a new KiWiLiteral using the values of the literal columns
* (svalue, ivalue, ...). The type of literal returned depends on the value of the ntype column.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param value the value of the integer literal to load
* @return a KiWiDoubleLiteral with the correct value, or null if it does not exist
* @throws SQLException
*/
public KiWiDoubleLiteral loadLiteral(double value) throws SQLException {
// look in cache
KiWiLiteral element = literalCache.get(LiteralCommons.createCacheKey(Double.toString(value), (String)null,Namespaces.NS_XSD + "double"));
if(element != null && element instanceof KiWiDoubleLiteral) {
return (KiWiDoubleLiteral)element;
}
requireJDBCConnection();
KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "double");
// ltype not persisted
if(ltype == null || ltype.getId() < 0) {
return null;
}
literalLock.lock();
try {
// otherwise prepare a query, depending on the parameters given
PreparedStatement query = getPreparedStatement("load.literal_by_dv");
query.setDouble(1, value);
query.setLong(2, ltype.getId());
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
KiWiNode kiWiNode = null;
try {
if (result.next()) {
return (KiWiDoubleLiteral) constructNodeFromDatabase(result);
} else {
return null;
}
} catch (RuntimeException e) {
log.error("Unable to create KiWiDoubleLiteral for node value '{}' (id={}): {}", value, kiWiNode.getId(), e.getMessage(), e);
throw e;
} finally {
result.close();
}
} finally {
literalLock.unlock();
}
}
/**
* Load a boolean literal with the boolean value given as argument if it exists. The method will first look in
* the node cache for cached nodes. If no cache entry is found, it will run a database query ("load.literal_by_bv")
* on the NODES table and construct a new KiWiLiteral using the values of the literal columns
* (svalue, ivalue, ...). The type of literal returned depends on the value of the ntype column.
* <p/>
* When a node is loaded from the database, it will be added to the different caches to speed up
* subsequent requests.
*
* @param value the value of the integer literal to load
* @return a KiWiBooleanLiteral with the correct value, or null if it does not exist
* @throws SQLException
*/
public KiWiBooleanLiteral loadLiteral(boolean value) throws SQLException {
// look in cache
KiWiLiteral element = literalCache.get(LiteralCommons.createCacheKey(Boolean.toString(value),(String)null,Namespaces.NS_XSD + "boolean"));
if(element != null && element instanceof KiWiBooleanLiteral) {
return (KiWiBooleanLiteral)element;
}
requireJDBCConnection();
KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "boolean");
// ltype not persisted
if(ltype == null || ltype.getId() < 0) {
return null;
}
literalLock.lock();
try {
// otherwise prepare a query, depending on the parameters given
PreparedStatement query = getPreparedStatement("load.literal_by_bv");
query.setBoolean(1, value);
query.setLong(2,ltype.getId());
// run the database query and if it yields a result, construct a new node; the method call will take care of
// caching the constructed node for future calls
ResultSet result = query.executeQuery();
try {
if(result.next()) {
return (KiWiBooleanLiteral)constructNodeFromDatabase(result);
} else {
return null;
}
} finally {
result.close();
}
} finally {
literalLock.unlock();
}
}
/**
* Store a new node in the database. The method will retrieve a new database id for the node and update the
* passed object. Afterwards, the node data will be inserted into the database using appropriate INSERT
* statements. The caller must make sure the connection is committed and closed properly.
* <p/>
* If the node already has an ID, the method will do nothing (assuming that it is already persistent)
*
*
* @param node
* @throws SQLException
*/
public synchronized void storeNode(KiWiNode node) throws SQLException {
// ensure the data type of a literal is persisted first
if(node instanceof KiWiLiteral) {
KiWiLiteral literal = (KiWiLiteral)node;
if(literal.getType() != null && literal.getType().getId() < 0) {
storeNode(literal.getType());
}
}
requireJDBCConnection();
// retrieve a new node id and set it in the node object
if(node.getId() < 0) {
node.setId(getNextSequence());
}
// distinguish the different node types and run the appropriate updates
if(node instanceof KiWiUriResource) {
KiWiUriResource uriResource = (KiWiUriResource)node;
PreparedStatement insertNode = getPreparedStatement("store.uri");
insertNode.setLong(1,node.getId());
insertNode.setString(2,uriResource.stringValue());
insertNode.setTimestamp(3, new Timestamp(uriResource.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else if(node instanceof KiWiAnonResource) {
KiWiAnonResource anonResource = (KiWiAnonResource)node;
PreparedStatement insertNode = getPreparedStatement("store.bnode");
insertNode.setLong(1,node.getId());
insertNode.setString(2,anonResource.stringValue());
insertNode.setTimestamp(3, new Timestamp(anonResource.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else if(node instanceof KiWiDateLiteral) {
KiWiDateLiteral dateLiteral = (KiWiDateLiteral)node;
PreparedStatement insertNode = getPreparedStatement("store.tliteral");
insertNode.setLong(1,node.getId());
insertNode.setString(2, dateLiteral.stringValue());
insertNode.setTimestamp(3, new Timestamp(dateLiteral.getDateContent().getMillis()), calendarUTC);
insertNode.setInt(4, dateLiteral.getDateContent().getZone().getOffset(dateLiteral.getDateContent())/1000);
if(dateLiteral.getType() != null)
insertNode.setLong(5,dateLiteral.getType().getId());
else
throw new IllegalStateException("a date literal must have a datatype");
insertNode.setTimestamp(6, new Timestamp(dateLiteral.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else if(node instanceof KiWiIntLiteral) {
KiWiIntLiteral intLiteral = (KiWiIntLiteral)node;
PreparedStatement insertNode = getPreparedStatement("store.iliteral");
insertNode.setLong(1,node.getId());
insertNode.setString(2, intLiteral.getContent());
insertNode.setDouble(3, intLiteral.getDoubleContent());
insertNode.setLong(4, intLiteral.getIntContent());
if(intLiteral.getType() != null)
insertNode.setLong(5,intLiteral.getType().getId());
else
throw new IllegalStateException("an integer literal must have a datatype");
insertNode.setTimestamp(6, new Timestamp(intLiteral.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else if(node instanceof KiWiDoubleLiteral) {
KiWiDoubleLiteral doubleLiteral = (KiWiDoubleLiteral)node;
PreparedStatement insertNode = getPreparedStatement("store.dliteral");
insertNode.setLong(1, node.getId());
insertNode.setString(2, doubleLiteral.getContent());
insertNode.setDouble(3, doubleLiteral.getDoubleContent());
if(doubleLiteral.getType() != null)
insertNode.setLong(4,doubleLiteral.getType().getId());
else
throw new IllegalStateException("a double literal must have a datatype");
insertNode.setTimestamp(5, new Timestamp(doubleLiteral.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else if(node instanceof KiWiBooleanLiteral) {
KiWiBooleanLiteral booleanLiteral = (KiWiBooleanLiteral)node;
PreparedStatement insertNode = getPreparedStatement("store.bliteral");
insertNode.setLong(1,node.getId());
insertNode.setString(2, booleanLiteral.getContent());
insertNode.setBoolean(3, booleanLiteral.booleanValue());
if(booleanLiteral.getType() != null)
insertNode.setLong(4,booleanLiteral.getType().getId());
else
throw new IllegalStateException("a boolean literal must have a datatype");
insertNode.setTimestamp(5, new Timestamp(booleanLiteral.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else if(node instanceof KiWiStringLiteral) {
KiWiStringLiteral stringLiteral = (KiWiStringLiteral)node;
Double dbl_value = null;
Long lng_value = null;
if(stringLiteral.getContent().length() < 64 && NumberUtils.isNumber(stringLiteral.getContent()))
try {
dbl_value = Double.parseDouble(stringLiteral.getContent());
lng_value = Long.parseLong(stringLiteral.getContent());
} catch (NumberFormatException ex) {
// ignore, keep NaN
}
PreparedStatement insertNode = getPreparedStatement("store.sliteral");
insertNode.setLong(1,node.getId());
insertNode.setString(2, stringLiteral.getContent());
if(dbl_value != null) {
insertNode.setDouble(3, dbl_value);
} else {
insertNode.setObject(3, null);
}
if(lng_value != null) {
insertNode.setLong(4, lng_value);
} else {
insertNode.setObject(4, null);
}
if(stringLiteral.getLocale() != null) {
insertNode.setString(5, stringLiteral.getLocale().getLanguage().toLowerCase());
} else {
insertNode.setObject(5, null);
}
if(stringLiteral.getType() != null) {
insertNode.setLong(6,stringLiteral.getType().getId());
} else {
insertNode.setObject(6, null);
}
insertNode.setTimestamp(7, new Timestamp(stringLiteral.getCreated().getTime()), calendarUTC);
insertNode.executeUpdate();
} else {
log.warn("unrecognized node type: {}", node.getClass().getCanonicalName());
}
cacheNode(node);
}
/**
* Store a triple in the database. This method assumes that all nodes used by the triple are already persisted.
*
* @param triple the triple to store
* @throws SQLException
* @throws NullPointerException in case the subject, predicate, object or context have not been persisted
* @return true in case the update added a new triple to the database, false in case the triple already existed
*/
public synchronized void storeTriple(final KiWiTriple triple) throws SQLException {
// mutual exclusion: prevent parallel adding and removing of the same triple
synchronized (triple) {
requireJDBCConnection();
if(triple.getId() < 0) {
triple.setId(getNextSequence());
}
if(deletedStatementsLog.mightContain(triple.getId())) {
// this is a hack for a concurrency problem that may occur in case the triple is removed in the
// transaction and then added again; in these cases the createStatement method might return
// an expired state of the triple because it uses its own database connection
//deletedStatementsLog.remove(triple.getId());
undeleteTriple(triple);
} else {
if(batchCommit) {
commitLock.lock();
try {
cacheTriple(triple);
tripleBatch.add(triple);
if(tripleBatch.size() >= batchSize) {
flushBatch();
}
} finally {
commitLock.unlock();
}
} else {
Preconditions.checkNotNull(triple.getSubject().getId());
Preconditions.checkNotNull(triple.getPredicate().getId());
Preconditions.checkNotNull(triple.getObject().getId());
try {
RetryExecution<Boolean> execution = new RetryExecution<>("STORE");
execution.setUseSavepoint(true);
execution.execute(connection, new RetryCommand<Boolean>() {
@Override
public Boolean run() throws SQLException {
PreparedStatement insertTriple = getPreparedStatement("store.triple");
insertTriple.setLong(1,triple.getId());
insertTriple.setLong(2,triple.getSubject().getId());
insertTriple.setLong(3,triple.getPredicate().getId());
insertTriple.setLong(4,triple.getObject().getId());
if(triple.getContext() != null) {
insertTriple.setLong(5,triple.getContext().getId());
} else {
insertTriple.setNull(5, Types.BIGINT);
}
insertTriple.setBoolean(6,triple.isInferred());
insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
int count = insertTriple.executeUpdate();
cacheTriple(triple);
return count > 0;
}
});
} catch(SQLException ex) {
if("HYT00".equals(ex.getSQLState())) { // H2 table locking timeout
throw new ConcurrentModificationException("the same triple was modified in concurrent transactions (triple="+triple+")");
} else {
throw ex;
}
}
}
}
}
}
/**
* Return the identifier of the triple with the given subject, predicate, object and context, or null if this
* triple does not exist. Used for quick existance checks of triples.
*
* @param subject
* @param predicate
* @param object
* @param context
* @return
*/
public synchronized long getTripleId(final KiWiResource subject, final KiWiUriResource predicate, final KiWiNode object, final KiWiResource context) throws SQLException {
if(tripleBatch != null && tripleBatch.size() > 0) {
Collection<KiWiTriple> batched = tripleBatch.listTriples(subject,predicate,object,context, false);
if(batched.size() > 0) {
return batched.iterator().next().getId();
}
}
requireJDBCConnection();
PreparedStatement loadTripleId = getPreparedStatement("load.triple");
loadTripleId.setLong(1, subject.getId());
loadTripleId.setLong(2, predicate.getId());
loadTripleId.setLong(3, object.getId());
if(context != null) {
loadTripleId.setLong(4, context.getId());
} else {
loadTripleId.setNull(4, Types.BIGINT);
}
ResultSet result = loadTripleId.executeQuery();
try {
if(result.next()) {
return result.getLong(1);
} else {
return -1L;
}
} finally {
result.close();
}
}
/**
* Mark the triple passed as argument as deleted, setting the "deleted" flag to true and
* updating the timestamp value of "deletedAt".
* <p/>
* The triple remains in the database, because other entities might still reference it (e.g. a version).
* Use the method cleanupTriples() to fully remove all deleted triples without references.
*
* @param triple
*/
public void deleteTriple(final KiWiTriple triple) throws SQLException {
requireJDBCConnection();
RetryExecution<Void> execution = new RetryExecution<>("DELETE");
execution.setUseSavepoint(true);
execution.execute(connection, new RetryCommand<Void>() {
@Override
public Void run() throws SQLException {
// mutual exclusion: prevent parallel adding and removing of the same triple
synchronized (triple) {
// make sure the triple is marked as deleted in case some service still holds a reference
triple.setDeleted(true);
triple.setDeletedAt(new Date());
if (triple.getId() < 0) {
log.warn("attempting to remove non-persistent triple: {}", triple);
} else {
if (batchCommit) {
// need to remove from triple batch and from database
commitLock.lock();
try {
if (tripleBatch == null || !tripleBatch.remove(triple)) {
PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
synchronized (deleteTriple) {
deleteTriple.setLong(1, triple.getId());
deleteTriple.executeUpdate();
}
deletedStatementsLog.put(triple.getId());
}
} finally {
commitLock.unlock();
}
} else {
requireJDBCConnection();
PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
synchronized (deleteTriple) {
deleteTriple.setLong(1, triple.getId());
deleteTriple.executeUpdate();
}
deletedStatementsLog.put(triple.getId());
}
}
removeCachedTriple(triple);
}
return null;
}
});
}
/**
* Mark all triples contained in the context passed as argument as deleted, setting the "deleted" flag to true and
* updating the timestamp value of "deletedAt".
* <p/>
* The triple remains in the database, because other entities might still reference it (e.g. a version).
* Use the method cleanupTriples() to fully remove all deleted triples without references.
* <p/>
* Warning: this method skips some concurrency and transaction safeguards for performance and therefore should
* only be called if run in an isolated transaction!
*
* @param ctx resource identifying the context to be deleted
*/
public void deleteContext(final KiWiResource ctx) throws SQLException {
requireJDBCConnection();
RetryExecution<Void> execution = new RetryExecution<>("DELETE");
execution.setUseSavepoint(true);
execution.execute(connection, new RetryCommand<Void>() {
@Override
public Void run() throws SQLException {
// mutual exclusion: prevent parallel adding and removing of the same triple
if (ctx.getId() < 0) {
log.warn("attempting to remove non-persistent context: {}", ctx);
} else {
if (batchCommit) {
// need to remove from triple batch and from database
commitLock.lock();
try {
if (tripleBatch == null || tripleBatch.size() == 0) {
PreparedStatement deleteTriple = getPreparedStatement("delete.context");
synchronized (deleteTriple) {
deleteTriple.setLong(1, ctx.getId());
deleteTriple.executeUpdate();
}
// deletedStatementsLog.put(triple.getId());
} else {
// delete all triples from triple batch with a matching context
for (Iterator<KiWiTriple> it = tripleBatch.iterator(); it.hasNext(); ) {
if (it.next().getContext().equals(ctx)) {
it.remove();
}
}
}
} finally {
commitLock.unlock();
}
} else {
requireJDBCConnection();
PreparedStatement deleteTriple = getPreparedStatement("delete.context");
synchronized (deleteTriple) {
deleteTriple.setLong(1, ctx.getId());
deleteTriple.executeUpdate();
}
//deletedStatementsLog.put(triple.getId());
}
}
//removeCachedTriple(triple);
// that's radical but safe, and the improved delete performance might be worth it
tripleCache.clear();
return null;
}
});
}
/**
* Mark all triples contained in the triple store as deleted, setting the "deleted" flag to true and
* updating the timestamp value of "deletedAt".
* <p/>
* The triple remains in the database, because other entities might still reference it (e.g. a version).
* Use the method cleanupTriples() to fully remove all deleted triples without references.
* <p/>
* Warning: this method skips some concurrency and transaction safeguards for performance and therefore should
* only be called if run in an isolated transaction!
*
*/
public void deleteAll() throws SQLException {
requireJDBCConnection();
RetryExecution<Void> execution = new RetryExecution<>("DELETE");
execution.setUseSavepoint(true);
execution.execute(connection, new RetryCommand<Void>() {
@Override
public Void run() throws SQLException {
// mutual exclusion: prevent parallel adding and removing of the same triple
if (batchCommit) {
// need to remove from triple batch and from database
commitLock.lock();
try {
if (tripleBatch == null || tripleBatch.size() == 0) {
PreparedStatement deleteTriple = getPreparedStatement("delete.repository");
synchronized (deleteTriple) {
deleteTriple.executeUpdate();
}
// deletedStatementsLog.put(triple.getId());
} else {
// delete all triples from triple batch with a matching context
tripleBatch.clear();
}
} finally {
commitLock.unlock();
}
} else {
requireJDBCConnection();
PreparedStatement deleteTriple = getPreparedStatement("delete.repository");
synchronized (deleteTriple) {
deleteTriple.executeUpdate();
}
//deletedStatementsLog.put(triple.getId());
}
//removeCachedTriple(triple);
// that's radical but safe, and the improved delete performance might be worth it
tripleCache.clear();
return null;
}
});
}
/**
* Mark the triple passed as argument as not deleted, setting the "deleted" flag to false and
* clearing the timestamp value of "deletedAt".
* <p/>
* Note that this operation should only be called if the triple was deleted before in the same
* transaction!
*
* @param triple
*/
public void undeleteTriple(KiWiTriple triple) throws SQLException {
if(triple.getId() < 0) {
log.warn("attempting to undelete non-persistent triple: {}",triple);
return;
}
requireJDBCConnection();
// make sure the triple is not marked as deleted in case some service still holds a reference
triple.setDeleted(false);
triple.setDeletedAt(null);
synchronized (triple) {
if(!triple.isDeleted()) {
log.warn("attempting to undelete triple that was not deleted: {}",triple);
}
PreparedStatement undeleteTriple = getPreparedStatement("undelete.triple");
undeleteTriple.setLong(1, triple.getId());
undeleteTriple.executeUpdate();
if(!persistence.getConfiguration().isClustered()) {
cacheTriple(triple);
}
}
}
/**
* List all contexts used in this triple store. See query.contexts .
* @return
* @throws SQLException
*/
public CloseableIteration<KiWiResource, SQLException> listContexts() throws SQLException {
requireJDBCConnection();
PreparedStatement queryContexts = getPreparedStatement("query.contexts");
final ResultSet result = queryContexts.executeQuery();
if(tripleBatch != null && tripleBatch.size() > 0) {
return new DistinctIteration<KiWiResource, SQLException>(
new UnionIteration<KiWiResource, SQLException>(
new ConvertingIteration<Resource,KiWiResource,SQLException>(new IteratorIteration<Resource, SQLException>(tripleBatch.listContextIDs().iterator())) {
@Override
protected KiWiResource convert(Resource sourceObject) throws SQLException {
return (KiWiResource)sourceObject;
}
},
new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
@Override
public KiWiResource apply(ResultSet row) throws SQLException {
return (KiWiResource)loadNodeById(result.getLong("context"));
}
})
)
);
} else {
return new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
@Override
public KiWiResource apply(ResultSet row) throws SQLException {
return (KiWiResource)loadNodeById(result.getLong("context"));
}
});
}
}
/**
* List all contexts used in this triple store. See query.contexts .
* @return
* @throws SQLException
*/
public CloseableIteration<KiWiResource, SQLException> listResources() throws SQLException {
requireJDBCConnection();
PreparedStatement queryContexts = getPreparedStatement("query.resources");
final ResultSet result = queryContexts.executeQuery();
return new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
@Override
public KiWiResource apply(ResultSet row) throws SQLException {
return (KiWiResource)constructNodeFromDatabase(row);
}
});
}
/**
* List all contexts used in this triple store. See query.contexts .
* @return
* @throws SQLException
*/
public CloseableIteration<KiWiUriResource, SQLException> listResources(String prefix) throws SQLException {
requireJDBCConnection();
PreparedStatement queryContexts = getPreparedStatement("query.resources_prefix");
queryContexts.setString(1, prefix + "%");
final ResultSet result = queryContexts.executeQuery();
return new ResultSetIteration<KiWiUriResource>(result, new ResultTransformerFunction<KiWiUriResource>() {
@Override
public KiWiUriResource apply(ResultSet row) throws SQLException {
return (KiWiUriResource)constructNodeFromDatabase(row);
}
});
}
public CloseableIteration<KiWiNamespace, SQLException> listNamespaces() throws SQLException {
requireJDBCConnection();
PreparedStatement queryContexts = getPreparedStatement("query.namespaces");
final ResultSet result = queryContexts.executeQuery();
return new ResultSetIteration<KiWiNamespace>(result, new ResultTransformerFunction<KiWiNamespace>() {
@Override
public KiWiNamespace apply(ResultSet input) throws SQLException {
return constructNamespaceFromDatabase(result);
}
});
}
/**
* Return a Sesame RepositoryResult of statements according to the query pattern given in the arguments. Each of
* the parameters subject, predicate, object and context may be null, indicating a wildcard query. If the boolean
* parameter "inferred" is set to true, the result will also include inferred triples, if it is set to false only
* base triples.
* <p/>
* The RepositoryResult holds a direct connection to the database and needs to be closed properly, or otherwise
* the system might run out of resources. The returned RepositoryResult will try its best to clean up when the
* iteration has completed or the garbage collector calls the finalize() method, but this can take longer than
* necessary.
*
*
* @param subject the subject to query for, or null for a wildcard query
* @param predicate the predicate to query for, or null for a wildcard query
* @param object the object to query for, or null for a wildcard query
* @param context the context to query for, or null for a wildcard query
* @param inferred if true, the result will also contain triples inferred by the reasoner, if false not
* @param wildcardContext if true, a null context will be interpreted as a wildcard, if false, a null context will be interpreted as "no context"
* @return a new RepositoryResult with a direct connection to the database; the result should be properly closed
* by the caller
*/
public RepositoryResult<Statement> listTriples(final KiWiResource subject, final KiWiUriResource predicate, final KiWiNode object, final KiWiResource context, final boolean inferred, final boolean wildcardContext) throws SQLException {
if(tripleBatch != null && tripleBatch.size() > 0) {
synchronized (tripleBatch) {
return new RepositoryResult<Statement>(
new ExceptionConvertingIteration<Statement, RepositoryException>(
new UnionIteration<Statement, SQLException>(
new IteratorIteration<Statement, SQLException>(tripleBatch.listTriples(subject,predicate,object,context, wildcardContext).iterator()),
new DelayedIteration<Statement, SQLException>() {
@Override
protected Iteration<? extends Statement, ? extends SQLException> createIteration() throws SQLException {
return listTriplesInternal(subject,predicate,object,context,inferred, wildcardContext);
}
}
)
) {
@Override
protected RepositoryException convert(Exception e) {
return new RepositoryException("database error while iterating over result set",e);
}
}
);
}
} else {
return new RepositoryResult<Statement>(
new ExceptionConvertingIteration<Statement, RepositoryException>(listTriplesInternal(subject,predicate,object,context,inferred, wildcardContext)) {
@Override
protected RepositoryException convert(Exception e) {
return new RepositoryException("database error while iterating over result set",e);
}
}
);
}
}
/**
* Internal implementation for actually carrying out the query. Returns a closable iteration that can be used
* in a repository result. The iteration is forward-only and does not allow removing result rows.
*
* @param subject the subject to query for, or null for a wildcard query
* @param predicate the predicate to query for, or null for a wildcard query
* @param object the object to query for, or null for a wildcard query
* @param context the context to query for, or null for a wildcard query
* @param inferred if true, the result will also contain triples inferred by the reasoner, if false not
* @param wildcardContext if true, a null context will be interpreted as a wildcard, if false, a null context will be interpreted as "no context"
* @return a ClosableIteration that wraps the database ResultSet; needs to be closed explicitly by the caller
* @throws SQLException
*/
private CloseableIteration<Statement, SQLException> listTriplesInternal(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred, final boolean wildcardContext) throws SQLException {
// if one of the database ids is null, there will not be any database results, so we can return an empty result
if(subject != null && subject.getId() < 0) {
return new EmptyIteration<Statement, SQLException>();
}
if(predicate != null && predicate.getId() < 0) {
return new EmptyIteration<Statement, SQLException>();
}
if(object != null && object.getId() < 0) {
return new EmptyIteration<Statement, SQLException>();
}
if(context != null && context.getId() < 0) {
return new EmptyIteration<Statement, SQLException>();
}
requireJDBCConnection();
// otherwise we need to create an appropriate SQL query and execute it, the repository result will be read-only
// and only allow forward iteration, so we can limit the query using the respective flags
PreparedStatement query = connection.prepareStatement(
constructTripleQuery(subject,predicate,object,context,inferred, wildcardContext),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY
);
query.clearParameters();
if(persistence.getDialect().isCursorSupported()) {
query.setFetchSize(persistence.getConfiguration().getCursorSize());
}
// set query parameters
int position = 1;
if(subject != null) {
query.setLong(position++, subject.getId());
}
if(predicate != null) {
query.setLong(position++, predicate.getId());
}
if(object != null) {
query.setLong(position++, object.getId());
}
if(context != null) {
query.setLong(position++, context.getId());
}
final ResultSet result = query.executeQuery();
return new CloseableIteration<Statement, SQLException>() {
List<KiWiTriple> batch = null;
int batchPosition = 0;
@Override
public void close() throws SQLException {
result.close();
}
@Override
public boolean hasNext() throws SQLException {
fetchBatch();
return batch.size() > batchPosition;
}
@Override
public Statement next() throws SQLException {
fetchBatch();
if(batch.size() > batchPosition) {
return batch.get(batchPosition++);
} else {
return null;
}
}
private void fetchBatch() throws SQLException {
if(batch == null || batch.size() <= batchPosition) {
batch = constructTriplesFromDatabase(result, QUERY_BATCH_SIZE);
batchPosition = 0;
}
}
@Override
public void remove() throws SQLException {
throw new UnsupportedOperationException("removing results not supported");
}
};
}
/**
* Construct the SQL query string from the query pattern passed as arguments
*
* @param subject the subject to query for, or null for a wildcard query
* @param predicate the predicate to query for, or null for a wildcard query
* @param object the object to query for, or null for a wildcard query
* @param context the context to query for, or null for a wildcard query
* @param inferred if true, the result will also contain triples inferred by the reasoner, if false not
* @return an SQL query string representing the triple pattern
*/
protected String constructTripleQuery(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred, boolean wildcardContext) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT id,subject,predicate,object,context,deleted,inferred,creator,createdAt,deletedAt FROM triples WHERE deleted = false");
if(subject != null) {
builder.append(" AND subject = ?");
}
if(predicate != null) {
builder.append(" AND predicate = ?");
}
if(object != null) {
builder.append(" AND object = ?");
}
if(context != null) {
builder.append(" AND context = ?");
} else if(!wildcardContext) {
builder.append(" AND context IS NULL");
}
if(!inferred) {
builder.append(" AND inferred = false");
}
return builder.toString();
}
protected KiWiNamespace constructNamespaceFromDatabase(ResultSet row) throws SQLException {
KiWiNamespace result = new KiWiNamespace(row.getString("prefix"),row.getString("uri"));
result.setId(row.getLong("id"));
result.setCreated(new Date(row.getTimestamp("createdAt").getTime()));
if(!namespacePrefixCache.containsKey(result.getPrefix())) {
namespacePrefixCache.put(result.getPrefix(), result);
}
if(!namespaceUriCache.containsKey(result.getUri())) {
namespaceUriCache.put(result.getUri(), result);
}
return result;
}
/**
* Construct an appropriate KiWiNode from the result of an SQL query. The method will not change the
* ResultSet iterator, only read its values, so it needs to be executed for each row separately.
* @param row
* @return
*/
protected KiWiNode constructNodeFromDatabase(ResultSet row) throws SQLException {
// column order; id,ntype,svalue,ivalue,dvalue,tvalue,tzoffset,bvalue,lang,ltype,createdAt
// 1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10 .11
long id = row.getLong(1);
KiWiNode cached = nodeCache.get(id);
// lookup element in cache first, so we can avoid reconstructing it if it is already there
if(cached != null) {
return cached;
}
String ntype = row.getString(2);
if("uri".equals(ntype)) {
KiWiUriResource result = new KiWiUriResource(row.getString(3),new Date(row.getTimestamp(11, calendarUTC).getTime()));
result.setId(id);
cacheNode(result);
return result;
} else if("bnode".equals(ntype)) {
KiWiAnonResource result = new KiWiAnonResource(row.getString(3), new Date(row.getTimestamp(11, calendarUTC).getTime()));
result.setId(id);
cacheNode(result);
return result;
} else if("string".equals(ntype)) {
final KiWiStringLiteral result = new KiWiStringLiteral(row.getString(3), new Date(row.getTimestamp(11, calendarUTC).getTime()));
result.setId(id);
if(row.getString(9) != null) {
result.setLocale(getLocale(row.getString(9)));
}
if(row.getLong(10) != 0) {
result.setType((KiWiUriResource) loadNodeById(row.getLong(10)));
}
cacheNode(result);
return result;
} else if("int".equals(ntype)) {
KiWiIntLiteral result = new KiWiIntLiteral(row.getLong(4), null, new Date(row.getTimestamp(11, calendarUTC).getTime()));
result.setId(id);
if(row.getLong(10) != 0) {
result.setType((KiWiUriResource) loadNodeById(row.getLong(10)));
}
cacheNode(result);
return result;
} else if("double".equals(ntype)) {
KiWiDoubleLiteral result = new KiWiDoubleLiteral(row.getDouble(5), null, new Date(row.getTimestamp(11, calendarUTC).getTime()));
result.setId(id);
if(row.getLong(10) != 0) {
result.setType((KiWiUriResource) loadNodeById(row.getLong(10)));
}
cacheNode(result);
return result;
} else if("boolean".equals(ntype)) {
KiWiBooleanLiteral result = new KiWiBooleanLiteral(row.getBoolean(8),null,new Date(row.getTimestamp(11, calendarUTC).getTime()));
result.setId(id);
if(row.getLong(10) != 0) {
result.setType((KiWiUriResource) loadNodeById(row.getLong(10)));
}
cacheNode(result);
return result;
} else if("date".equals(ntype)) {
KiWiDateLiteral result = new KiWiDateLiteral();
result.setCreated(new Date(row.getTimestamp(11, calendarUTC).getTime()));
DateTime dvalue = new DateTime(row.getTimestamp(6, calendarUTC).getTime(), DateTimeZone.forOffsetMillis(row.getInt(7)*1000));
if(row.getLong(10) != 0) {
result.setType((KiWiUriResource) loadNodeById(row.getLong(10)));
}
result.setId(id);
result.setDateContent(dvalue);
cacheNode(result);
return result;
} else {
throw new IllegalArgumentException("unknown node type in database result for node id " + id + ": " + ntype);
}
}
/**
* Construct a KiWiTriple from the result of an SQL query. The query result is expected to contain the
* following columns:
* <ul>
* <li>id: the database id of the triple (long value)</li>
* <li>subject: the database id of the subject (long value); the node will be loaded using the loadNodeById method</li>
* <li>predicate: the database id of the predicate (long value); the node will be loaded using the loadNodeById method</li>
* <li>object: the database id of the object (long value); the node will be loaded using the loadNodeById method</li>
* <li>context: the database id of the context (long value); the node will be loaded using the loadNodeById method</li>
* <li>creator: the database id of the creator (long value); the node will be loaded using the loadNodeById method; may be null</li>
* <li>deleted: a flag (boolean) indicating whether this triple has been deleted</li>
* <li>inferred: a flag (boolean) indicating whether this triple has been inferred by the KiWi reasoner</li>
* <li>createdAt: a timestamp representing the creation date of the triple</li>
* <li>createdAt: a timestamp representing the deletion date of the triple (null in case triple is not deleted)</li>
* </ul>
* The method will not change the ResultSet iterator, only read its values, so it needs to be executed for each row separately.
*
* @param row a database result containing the columns described above
* @return a KiWiTriple representation of the database result
*/
protected KiWiTriple constructTripleFromDatabase(ResultSet row) throws SQLException {
if(row.isClosed()) {
throw new ResultInterruptedException("retrieving results has been interrupted");
}
// columns: id,subject,predicate,object,context,deleted,inferred,creator,createdAt,deletedAt
// 1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10
Long id = row.getLong(1);
KiWiTriple cached = tripleCache.get(id);
// lookup element in cache first, so we can avoid reconstructing it if it is already there
if(cached != null) {
return cached;
}
KiWiTriple result = new KiWiTriple();
result.setId(id);
KiWiNode[] batch = loadNodesByIds(row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5));
result.setSubject((KiWiResource) batch[0]);
result.setPredicate((KiWiUriResource) batch[1]);
result.setObject(batch[2]);
result.setContext((KiWiResource) batch[3]);
// result.setSubject((KiWiResource)loadNodeById(row.getLong(2)));
// result.setPredicate((KiWiUriResource) loadNodeById(row.getLong(3)));
// result.setObject(loadNodeById(row.getLong(4)));
// result.setContext((KiWiResource) loadNodeById(row.getLong(5)));
if(row.getLong(8) != 0) {
result.setCreator((KiWiResource)loadNodeById(row.getLong(8)));
}
result.setDeleted(row.getBoolean(6));
result.setInferred(row.getBoolean(7));
result.setCreated(new Date(row.getTimestamp(9).getTime()));
try {
if(row.getDate(10) != null) {
result.setDeletedAt(new Date(row.getTimestamp(10).getTime()));
}
} catch (SQLException ex) {
// work around a MySQL problem with null dates
// (see http://stackoverflow.com/questions/782823/handling-datetime-values-0000-00-00-000000-in-jdbc)
}
cacheTriple(result);
return result;
}
/**
* Construct a batch of KiWiTriples from the result of an SQL query. This query differs from constructTripleFromDatabase
* in that it does a batch-prefetching for optimized performance
*
* @param row a database result containing the columns described above
* @return a KiWiTriple representation of the database result
*/
protected List<KiWiTriple> constructTriplesFromDatabase(ResultSet row, int maxPrefetch) throws SQLException {
int count = 0;
// declare variables to optimize stack allocation
KiWiTriple triple;
long id;
List<KiWiTriple> result = new ArrayList<>();
Map<Long,Long[]> tripleIds = new HashMap<>();
Set<Long> nodeIds = new HashSet<>();
while(count < maxPrefetch && row.next()) {
count++;
if(row.isClosed()) {
throw new ResultInterruptedException("retrieving results has been interrupted");
}
// columns: id,subject,predicate,object,context,deleted,inferred,creator,createdAt,deletedAt
// 1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10
id = row.getLong(1);
triple = tripleCache.get(id);
// lookup element in cache first, so we can avoid reconstructing it if it is already there
if(triple != null) {
result.add(triple);
} else {
triple = new KiWiTriple();
triple.setId(id);
// collect node ids for batch retrieval
nodeIds.add(row.getLong(2));
nodeIds.add(row.getLong(3));
nodeIds.add(row.getLong(4));
if(row.getLong(5) != 0) {
nodeIds.add(row.getLong(5));
}
if(row.getLong(8) != 0) {
nodeIds.add(row.getLong(8));
}
// remember which node ids where relevant for the triple
tripleIds.put(id,new Long[] { row.getLong(2),row.getLong(3),row.getLong(4),row.getLong(5),row.getLong(8) });
triple.setDeleted(row.getBoolean(6));
triple.setInferred(row.getBoolean(7));
triple.setCreated(new Date(row.getTimestamp(9).getTime()));
try {
if(row.getDate(10) != null) {
triple.setDeletedAt(new Date(row.getTimestamp(10).getTime()));
}
} catch (SQLException ex) {
// work around a MySQL problem with null dates
// (see http://stackoverflow.com/questions/782823/handling-datetime-values-0000-00-00-000000-in-jdbc)
}
result.add(triple);
}
}
KiWiNode[] nodes = loadNodesByIds(Longs.toArray(nodeIds));
Map<Long,KiWiNode> nodeMap = new HashMap<>();
for(int i=0; i<nodes.length; i++) {
nodeMap.put(nodes[i].getId(), nodes[i]);
}
for(KiWiTriple t : result) {
if(tripleIds.containsKey(t.getId())) {
// need to set subject, predicate, object, context and creator
Long[] ids = tripleIds.get(t.getId());
t.setSubject((KiWiResource) nodeMap.get(ids[0]));
t.setPredicate((KiWiUriResource) nodeMap.get(ids[1]));
t.setObject(nodeMap.get(ids[2]));
if(ids[3] != 0) {
t.setContext((KiWiResource) nodeMap.get(ids[3]));
}
if(ids[4] != 0) {
t.setCreator((KiWiResource) nodeMap.get(ids[4]));
}
}
cacheTriple(t);
}
return result;
}
protected static Locale getLocale(String language) {
Locale locale = localeMap.get(language);
if(locale == null && language != null && !language.isEmpty()) {
try {
Locale.Builder builder = new Locale.Builder();
builder.setLanguageTag(language);
locale = builder.build();
localeMap.put(language, locale);
} catch (IllformedLocaleException ex) {
throw new IllegalArgumentException("Language was not a valid BCP47 language: " + language, ex);
}
}
return locale;
}
/**
* Return the prepared statement with the given identifier; first looks in the statement cache and if it does
* not exist there create a new statement.
*
* @param key the id of the statement in statements.properties
* @return
* @throws SQLException
*/
public PreparedStatement getPreparedStatement(String key) throws SQLException {
requireJDBCConnection();
PreparedStatement statement = statementCache.get(key);
if(statement == null || statement.isClosed()) {
statement = connection.prepareStatement(dialect.getStatement(key), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statementCache.put(key,statement);
}
statement.clearParameters();
if(persistence.getDialect().isCursorSupported()) {
statement.setFetchSize(persistence.getConfiguration().getCursorSize());
}
return statement;
}
/**
* Return the prepared statement with the given identifier; first looks in the statement cache and if it does
* not exist there create a new statement. This method is used for building statements with variable argument
* numbers (e.g. in an IN).
*
* @param key the id of the statement in statements.properties
* @return
* @throws SQLException
*/
public PreparedStatement getPreparedStatement(String key, int numberOfArguments) throws SQLException {
requireJDBCConnection();
PreparedStatement statement = statementCache.get(key+numberOfArguments);
if(statement == null || statement.isClosed()) {
StringBuilder s = new StringBuilder();
for(int i=0; i<numberOfArguments; i++) {
if(i != 0) {
s.append(',');
}
s.append('?');
}
statement = connection.prepareStatement(String.format(dialect.getStatement(key),s.toString(), numberOfArguments), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statementCache.put(key+numberOfArguments,statement);
}
statement.clearParameters();
if(persistence.getDialect().isCursorSupported()) {
statement.setFetchSize(persistence.getConfiguration().getCursorSize());
}
return statement;
}
/**
* Get next number in a sequence; for databases without sequence support (e.g. MySQL), this method will first update a
* sequence table and then return the value.
*
* @return a new sequence ID
* @throws SQLException
*/
public long getNextSequence() throws SQLException {
return persistence.getIdGenerator().getId();
}
private void cacheNode(KiWiNode node) {
if(node.getId() >= 0) {
nodeCache.put(node.getId(), node);
}
if(node instanceof KiWiUriResource) {
uriCache.put(node.stringValue(), (KiWiUriResource) node);
} else if(node instanceof KiWiAnonResource) {
bnodeCache.put(node.stringValue(), (KiWiAnonResource) node);
} else if(node instanceof KiWiLiteral) {
literalCache.put(LiteralCommons.createCacheKey((Literal) node), (KiWiLiteral) node);
}
}
private void cacheTriple(KiWiTriple triple) {
if(triple.getId() >= 0) {
tripleCache.put(triple.getId(), triple);
}
}
private void removeCachedTriple(KiWiTriple triple) {
if(triple.getId() >= 0) {
tripleCache.remove(triple.getId());
}
}
/**
* Return a collection of database tables contained in the database. This query is used for checking whether
* the database needs to be created when initialising the system.
*
*
*
* @return
* @throws SQLException
*/
public Set<String> getDatabaseTables() throws SQLException {
requireJDBCConnection();
PreparedStatement statement = getPreparedStatement("meta.tables");
ResultSet result = statement.executeQuery();
try {
Set<String> tables = new HashSet<String>();
while(result.next()) {
tables.add(result.getString(1).toLowerCase());
}
return tables;
} finally {
result.close();
}
}
/**
* Return the metadata value with the given key; can be used by KiWi modules to retrieve module-specific metadata.
*
* @param key
* @return
* @throws SQLException
*/
public String getMetadata(String key) throws SQLException {
requireJDBCConnection();
PreparedStatement statement = getPreparedStatement("meta.get");
statement.setString(1,key);
ResultSet result = statement.executeQuery();
try {
if(result.next()) {
return result.getString(1);
} else {
return null;
}
} finally {
result.close();
}
}
/**
* Update the metadata value for the given key; can be used by KiWi modules to set module-specific metadata.
*
* @param key
* @return
* @throws SQLException
*/
public void setMetadata(String key, String value) throws SQLException {
requireJDBCConnection();
PreparedStatement statement = getPreparedStatement("meta.get");
ResultSet result = statement.executeQuery();
try {
if(result.next()) {
PreparedStatement update = getPreparedStatement("meta.update");
update.clearParameters();
update.setString(1, value);
update.setString(2, key);
update.executeUpdate();
} else {
PreparedStatement insert = getPreparedStatement("meta.insert");
insert.clearParameters();
insert.setString(1, key);
insert.setString(2, value);
insert.executeUpdate();
}
} finally {
result.close();
}
}
/**
* Return the KiWi version of the database this connection is operating on. This query is necessary for
* checking proper state of a database when initialising the system.
*
* @return
*/
public int getDatabaseVersion() throws SQLException {
requireJDBCConnection();
PreparedStatement statement = getPreparedStatement("meta.version");
ResultSet result = statement.executeQuery();
try {
if(result.next()) {
return Integer.parseInt(result.getString(1));
} else {
throw new SQLException("no version information available");
}
} finally {
result.close();
}
}
/**
* Sets this connection's auto-commit mode to the given state.
* If a connection is in auto-commit mode, then all its SQL
* statements will be executed and committed as individual
* transactions. Otherwise, its SQL statements are grouped into
* transactions that are terminated by a call to either
* the method <code>commit</code> or the method <code>rollback</code>.
* By default, new connections are in auto-commit
* mode.
* <P>
* The commit occurs when the statement completes. The time when the statement
* completes depends on the type of SQL Statement:
* <ul>
* <li>For DML statements, such as Insert, Update or Delete, and DDL statements,
* the statement is complete as soon as it has finished executing.
* <li>For Select statements, the statement is complete when the associated result
* set is closed.
* <li>For <code>CallableStatement</code> objects or for statements that return
* multiple results, the statement is complete
* when all of the associated result sets have been closed, and all update
* counts and output parameters have been retrieved.
*</ul>
* <P>
* <B>NOTE:</B> If this method is called during a transaction and the
* auto-commit mode is changed, the transaction is committed. If
* <code>setAutoCommit</code> is called and the auto-commit mode is
* not changed, the call is a no-op.
*
* @param autoCommit <code>true</code> to enable auto-commit mode;
* <code>false</code> to disable it
* @exception java.sql.SQLException if a database access error occurs,
* setAutoCommit(true) is called while participating in a distributed transaction,
* or this method is called on a closed connection
* @see #getAutoCommit
*/
public void setAutoCommit(boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
if(connection != null) {
connection.setAutoCommit(autoCommit);
}
}
/**
* Retrieves the current auto-commit mode for this <code>Connection</code>
* object.
*
* @return the current state of this <code>Connection</code> object's
* auto-commit mode
* @exception java.sql.SQLException if a database access error occurs
* or this method is called on a closed connection
* @see #setAutoCommit
*/
public boolean getAutoCommit() throws SQLException {
return autoCommit;
}
/**
* Return true if batched commits are enabled. Batched commits will try to group database operations and
* keep a memory log while storing triples. This can considerably improve the database performance.
* @return
*/
public boolean isBatchCommit() {
return batchCommit;
}
/**
* Enabled batched commits. Batched commits will try to group database operations and
* keep a memory log while storing triples. This can considerably improve the database performance.
* @return
*/
public void setBatchCommit(boolean batchCommit) {
if(dialect.isBatchSupported()) {
this.batchCommit = batchCommit;
} else {
log.warn("batch commits are not supported by this database dialect");
}
}
/**
* Return the size of a batch for batched commits. Batched commits will try to group database operations and
* keep a memory log while storing triples. This can considerably improve the database performance.
* @return
*/
public int getBatchSize() {
return batchSize;
}
/**
* Set the size of a batch for batched commits. Batched commits will try to group database operations and
* keep a memory log while storing triples. This can considerably improve the database performance.
* @param batchSize
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* Makes all changes made since the previous
* commit/rollback permanent and releases any database locks
* currently held by this <code>Connection</code> object.
* This method should be
* used only when auto-commit mode has been disabled.
*
* @exception java.sql.SQLException if a database access error occurs,
* this method is called while participating in a distributed transaction,
* if this method is called on a closed conection or this
* <code>Connection</code> object is in auto-commit mode
* @see #setAutoCommit
*/
public synchronized void commit() throws SQLException {
numberOfCommits++;
RetryExecution execution = new RetryExecution("COMMIT");
execution.execute(connection, new RetryCommand<Void>() {
@Override
public Void run() throws SQLException {
if(tripleBatch != null && tripleBatch.size() > 0) {
flushBatch();
}
deletedStatementsLog = BloomFilter.create(Funnels.longFunnel(), 100000);
if(connection != null) {
connection.commit();
}
return null;
}
});
this.transactionId = getNextSequence();
}
/**
* Undoes all changes made in the current transaction
* and releases any database locks currently held
* by this <code>Connection</code> object. This method should be
* used only when auto-commit mode has been disabled.
*
* @exception java.sql.SQLException if a database access error occurs,
* this method is called while participating in a distributed transaction,
* this method is called on a closed connection or this
* <code>Connection</code> object is in auto-commit mode
* @see #setAutoCommit
*/
public void rollback() throws SQLException {
if(tripleBatch != null && tripleBatch.size() > 0) {
synchronized (tripleBatch) {
for(KiWiTriple triple : tripleBatch) {
triple.setId(-1L);
}
tripleBatch.clear();
}
}
deletedStatementsLog = BloomFilter.create(Funnels.longFunnel(), 100000);
if(connection != null && !connection.isClosed()) {
connection.rollback();
}
this.transactionId = getNextSequence();
}
/**
* Retrieves whether this <code>Connection</code> object has been
* closed. A connection is closed if the method <code>close</code>
* has been called on it or if certain fatal errors have occurred.
* This method is guaranteed to return <code>true</code> only when
* it is called after the method <code>Connection.close</code> has
* been called.
* <P>
* This method generally cannot be called to determine whether a
* connection to a database is valid or invalid. A typical client
* can determine that a connection is invalid by catching any
* exceptions that might be thrown when an operation is attempted.
*
* @return <code>true</code> if this <code>Connection</code> object
* is closed; <code>false</code> if it is still open
* @exception java.sql.SQLException if a database access error occurs
*/
public boolean isClosed() throws SQLException {
if(connection != null) {
return connection.isClosed();
} else {
return false;
}
}
/**
* Releases this <code>Connection</code> object's database and JDBC resources
* immediately instead of waiting for them to be automatically released.
* <P>
* Calling the method <code>close</code> on a <code>Connection</code>
* object that is already closed is a no-op.
* <P>
* It is <b>strongly recommended</b> that an application explicitly
* commits or rolls back an active transaction prior to calling the
* <code>close</code> method. If the <code>close</code> method is called
* and there is an active transaction, the results are implementation-defined.
* <P>
*
* @exception java.sql.SQLException SQLException if a database access error occurs
*/
public void close() throws SQLException {
closed = true;
if(connection != null) {
// close all prepared statements
try {
for(Map.Entry<String,PreparedStatement> entry : statementCache.entrySet()) {
try {
entry.getValue().close();
} catch (SQLException ex) {}
}
} catch(AbstractMethodError ex) {
log.debug("database system does not allow closing statements");
}
persistence.releaseJDBCConnection(connection);
}
}
int retry = 0;
public synchronized void flushBatch() throws SQLException {
if(batchCommit && tripleBatch != null) {
requireJDBCConnection();
commitLock.lock();
try {
RetryExecution execution = new RetryExecution("FLUSH BATCH");
execution.setUseSavepoint(true);
execution.execute(connection, new RetryCommand<Void>() {
@Override
public Void run() throws SQLException {
PreparedStatement insertTriple = getPreparedStatement("store.triple");
insertTriple.clearParameters();
insertTriple.clearBatch();
synchronized (tripleBatch) {
for(KiWiTriple triple : tripleBatch) {
// retrieve a new triple ID and set it in the object
if(triple.getId() < 0) {
triple.setId(getNextSequence());
}
insertTriple.setLong(1,triple.getId());
insertTriple.setLong(2,triple.getSubject().getId());
insertTriple.setLong(3,triple.getPredicate().getId());
insertTriple.setLong(4,triple.getObject().getId());
if(triple.getContext() != null) {
insertTriple.setLong(5,triple.getContext().getId());
} else {
insertTriple.setNull(5, Types.BIGINT);
}
insertTriple.setBoolean(6,triple.isInferred());
insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
insertTriple.addBatch();
}
}
insertTriple.executeBatch();
tripleBatch.clear();
return null;
}
});
} finally {
commitLock.unlock();
}
}
}
/**
* Return the current transaction ID
* @return
*/
public long getTransactionId() {
return transactionId;
}
protected interface RetryCommand<T> {
public T run() throws SQLException;
}
/**
* A generic implementation of an SQL command that might fail (e.g. because of a timeout or concurrency situation)
* and should be retried several times before giving up completely.
*
*/
protected class RetryExecution<T> {
// counter for current number of retries
private int retries = 0;
// how often to reattempt the operation
private int maxRetries = 10;
// how long to wait before retrying
private long retryInterval = 1000;
// use an SQL savepoint and roll back in case a retry is needed?
private boolean useSavepoint = false;
private String name;
// if non-empty: only retry on the SQL states contained in this set
private Set<String> sqlStates;
public RetryExecution(String name) {
this.name = name;
this.sqlStates = new HashSet<>();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getMaxRetries() {
return maxRetries;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
public long getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(long retryInterval) {
this.retryInterval = retryInterval;
}
public boolean isUseSavepoint() {
return useSavepoint;
}
public void setUseSavepoint(boolean useSavepoint) {
this.useSavepoint = useSavepoint;
}
public Set<String> getSqlStates() {
return sqlStates;
}
public T execute(Connection connection, RetryCommand<T> command) throws SQLException {
if(!closed) {
Savepoint savepoint = null;
if(useSavepoint) {
savepoint = connection.setSavepoint();
}
try {
T result = command.run();
if(useSavepoint && savepoint != null) {
connection.releaseSavepoint(savepoint);
}
return result;
} catch (SQLException ex) {
if(retries < maxRetries && (sqlStates.size() == 0 || sqlStates.contains(ex.getSQLState()))) {
if(useSavepoint && savepoint != null) {
connection.rollback(savepoint);
}
Random rnd = new Random();
long sleep = retryInterval - 250 + rnd.nextInt(500);
log.warn("{}: temporary conflict, retrying in {} ms ... (thread={}, retry={})", name, sleep, Thread.currentThread().getName(), retries);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {}
retries++;
T result = execute(connection, command);
retries--;
return result;
} else {
log.error("{}: temporary conflict could not be solved! (error: {})", name, ex.getMessage());
log.debug("main exception:",ex);
log.debug("next exception:",ex.getNextException());
throw ex;
}
}
} else {
return null;
}
}
}
}