blob: c5daf41b57603f230ea72894507efe1025dbd657 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.rdb;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getModuleVersion;
import java.io.Closeable;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder;
import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.PreparedStatementComponent;
import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
public class RDBBlobStore extends CachingBlobStore implements Closeable {
/**
* Creates a {@linkplain RDBBlobStore} instance using the provided
* {@link DataSource} using the given {@link DocumentNodeStoreBuilder} and
* {@link RDBOptions}.
*/
public RDBBlobStore(@NotNull DataSource ds, @Nullable DocumentNodeStoreBuilder<?> builder, @Nullable RDBOptions options) {
try {
initialize(ds, builder, options == null ? new RDBOptions() : options);
} catch (Exception ex) {
throw new DocumentStoreException("initializing RDB blob store", ex);
}
}
/**
* Creates a {@linkplain RDBBlobStore} instance using the provided
* {@link DataSource} using default {@link DocumentNodeStoreBuilder} and the
* given {@link RDBOptions}.
*/
public RDBBlobStore(@NotNull DataSource ds, @Nullable RDBOptions options) {
this(ds, null, options);
}
/**
* Creates a {@linkplain RDBBlobStore} instance using the provided
* {@link DataSource} using default {@link DocumentNodeStoreBuilder} and
* {@link RDBOptions}.
*/
public RDBBlobStore(@NotNull DataSource ds) {
this(ds, null, null);
}
@Override
public void close() {
String dropped = "";
if (!this.tablesToBeDropped.isEmpty()) {
LOG.debug("attempting to drop: " + this.tablesToBeDropped);
for (String tname : this.tablesToBeDropped) {
Connection con = null;
try {
con = this.ch.getRWConnection();
Statement stmt = null;
try {
stmt = con.createStatement();
stmt.execute("drop table " + tname);
stmt.close();
con.commit();
dropped += tname + " ";
} catch (SQLException ex) {
LOG.debug("attempting to drop: " + tname, ex);
} finally {
closeStatement(stmt);
}
} catch (SQLException ex) {
LOG.debug("attempting to drop: " + tname, ex);
} finally {
this.ch.closeConnection(con);
}
}
dropped = dropped.trim();
}
try {
this.ch.close();
} catch (IOException ex) {
LOG.error("closing connection handler", ex);
}
LOG.info("RDBBlobStore (" + getModuleVersion() + ") closed"
+ (dropped.isEmpty() ? "" : " (tables dropped: " + dropped + ")"));
}
@Override
protected void finalize() throws Throwable {
if (!this.ch.isClosed() && this.callStack != null) {
LOG.debug("finalizing RDBDocumentStore that was not disposed", this.callStack);
}
super.finalize();
}
private static final Logger LOG = LoggerFactory.getLogger(RDBBlobStore.class);
private static final PerfLogger PERFLOG = new PerfLogger(
LoggerFactory.getLogger(RDBBlobStore.class.getName() + ".perf"));
// ID size we need to support; is 2 * (hex) size of digest length
protected static final int IDSIZE;
static {
try {
MessageDigest md = MessageDigest.getInstance(AbstractBlobStore.HASH_ALGORITHM);
IDSIZE = md.getDigestLength() * 2;
} catch (NoSuchAlgorithmException ex) {
LOG.error ("can't determine digest length for blob store", ex);
throw new RuntimeException(ex);
}
}
private Exception callStack;
protected RDBConnectionHandler ch;
// from options
protected String tnData;
protected String tnMeta;
private Set<String> tablesToBeDropped = new HashSet<String>();
private boolean readOnly;
private void initialize(DataSource ds, DocumentNodeStoreBuilder<?> builder, RDBOptions options) throws Exception {
this.readOnly = builder == null ? false : builder.getReadOnlyMode();
this.tnData = RDBJDBCTools.createTableName(options.getTablePrefix(), "DATASTORE_DATA");
this.tnMeta = RDBJDBCTools.createTableName(options.getTablePrefix(), "DATASTORE_META");
this.ch = new RDBConnectionHandler(ds);
Connection con = this.ch.getRWConnection();
int isolation = con.getTransactionIsolation();
String isolationDiags = RDBJDBCTools.isolationLevelToString(isolation);
if (isolation != Connection.TRANSACTION_READ_COMMITTED) {
LOG.info("Detected transaction isolation level " + isolationDiags + " is "
+ (isolation < Connection.TRANSACTION_READ_COMMITTED ? "lower" : "higher") + " than expected "
+ RDBJDBCTools.isolationLevelToString(Connection.TRANSACTION_READ_COMMITTED)
+ " - check datasource configuration");
}
DatabaseMetaData md = con.getMetaData();
RDBBlobStoreDB db = RDBBlobStoreDB.getValue(md.getDatabaseProductName());
String versionDiags = db.checkVersion(md);
if (!versionDiags.isEmpty()) {
LOG.error(versionDiags);
}
String dbDesc = String.format("%s %s (%d.%d)", md.getDatabaseProductName(), md.getDatabaseProductVersion(),
md.getDatabaseMajorVersion(), md.getDatabaseMinorVersion()).replaceAll("[\r\n\t]", " ").trim();
String driverDesc = String.format("%s %s (%d.%d)", md.getDriverName(), md.getDriverVersion(), md.getDriverMajorVersion(),
md.getDriverMinorVersion()).replaceAll("[\r\n\t]", " ").trim();
String dbUrl = md.getURL();
List<String> tablesCreated = new ArrayList<String>();
List<String> tablesPresent = new ArrayList<String>();
Map<String, String> tableInfo = new HashMap<>();
Statement createStatement = null;
try {
for (String tableName : new String[] { this.tnData, this.tnMeta }) {
Statement checkStatement = null;
try {
// avoid PreparedStatement due to weird DB2 behavior (OAK-6237)
checkStatement = con.createStatement();
ResultSet checkResultSet = checkStatement.executeQuery("select * from " + tableName + " where ID = '0'");
// try to discover metadata
tableInfo.put(tableName, RDBJDBCTools.dumpResultSetMeta(checkResultSet.getMetaData()));
closeResultSet(checkResultSet);
checkStatement = closeStatement(checkStatement);
con.commit();
tablesPresent.add(tableName);
} catch (SQLException ex) {
checkStatement = closeStatement(checkStatement);
// table does not appear to exist
con.rollback();
LOG.debug("trying to read from '" + tableName + "'", ex);
if (this.readOnly) {
throw new SQLException("Would like to create table '" + tableName
+ "', but RDBBlobStore has been initialized in 'readonly' mode");
}
createStatement = con.createStatement();
if (this.tnMeta.equals(tableName)) {
String ct = db.getMetaTableCreationStatement(tableName);
createStatement.execute(ct);
} else {
String ct = db.getDataTableCreationStatement(tableName);
createStatement.execute(ct);
}
createStatement.close();
createStatement = null;
con.commit();
ResultSet checkResultSet = null;
try {
checkStatement = con.createStatement();
checkResultSet = checkStatement.executeQuery("select * from " + tableName + " where ID = '0'");
tableInfo.put(tableName, RDBJDBCTools.dumpResultSetMeta(checkResultSet.getMetaData()));
con.commit();
} finally {
closeResultSet(checkResultSet);
closeStatement(checkStatement);
}
tablesCreated.add(tableName);
}
}
if (options.isDropTablesOnClose()) {
tablesToBeDropped.addAll(tablesCreated);
}
Map<String, String> diag = db.getAdditionalDiagnostics(this.ch, this.tnData);
LOG.info("RDBBlobStore (" + getModuleVersion() + ") instantiated for database " + dbDesc + ", using driver: "
+ driverDesc + ", connecting to: " + dbUrl + (diag.isEmpty() ? "" : (", properties: " + diag.toString()))
+ ", transaction isolation level: " + isolationDiags + ", " + tableInfo);
if (!tablesPresent.isEmpty()) {
LOG.info("Tables present upon startup: " + tablesPresent);
}
if (!tablesCreated.isEmpty()) {
LOG.info("Tables created upon startup: " + tablesCreated
+ (options.isDropTablesOnClose() ? " (will be dropped on exit)" : ""));
}
String moreDiags = db.evaluateDiagnostics(diag);
if (moreDiags != null) {
LOG.info(moreDiags);
}
this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBBlobStore creation") : null;
} finally {
closeStatement(createStatement);
this.ch.closeConnection(con);
}
}
private long minLastModified;
@Override
protected void storeBlock(byte[] digest, int level, byte[] data) throws IOException {
if (this.readOnly) {
throw new IOException("RDBBlobStore has been initialized in 'readonly' mode");
}
try {
storeBlockInDatabase(digest, level, data);
} catch (SQLException e) {
throw new IOException(e);
}
}
private void storeBlockInDatabase(byte[] digest, int level, byte[] data) throws SQLException {
String id = StringUtils.convertBytesToHex(digest);
cache.put(id, data);
Connection con = this.ch.getRWConnection();
try {
long now = System.currentTimeMillis();
PreparedStatement prep = con.prepareStatement("update " + this.tnMeta + " set LASTMOD = ? where ID = ?");
int count;
try {
prep.setLong(1, now);
prep.setString(2, id);
count = prep.executeUpdate();
}
catch (SQLException ex) {
LOG.error("trying to update metadata", ex);
throw new SQLException("trying to update metadata", ex);
}
finally {
prep.close();
}
if (count == 0) {
try {
prep = con.prepareStatement("insert into " + this.tnData + " (ID, DATA) values(?, ?)");
try {
prep.setString(1, id);
prep.setBytes(2, data);
int rows = prep.executeUpdate();
LOG.trace("insert-data id={} rows={}", id, rows);
if (rows != 1) {
throw new SQLException("Insert of id " + id + " into " + this.tnData + " failed with result " + rows);
}
} finally {
prep.close();
}
} catch (SQLException ex) {
this.ch.rollbackConnection(con);
// the insert failed although it should have succeeded; see whether the blob already exists
prep = con.prepareStatement("select DATA from " + this.tnData + " where ID = ?");
ResultSet rs = null;
byte[] dbdata = null;
try {
prep.setString(1, id);
rs = prep.executeQuery();
if (rs.next()) {
dbdata = rs.getBytes(1);
}
} finally {
closeResultSet(rs);
closeStatement(prep);
}
if (dbdata == null) {
// insert failed although record isn't there
String message = "insert document failed for id " + id + " with length " + data.length + " (check max size of datastore_data.data)";
LOG.error(message, ex);
throw new SQLException(message, ex);
}
else if (!Arrays.equals(data, dbdata)) {
// record is there but contains different data
String message = "DATA table already contains blob for id " + id + ", but the actual data differs (lengths: " + data.length + ", " + dbdata.length + ")";
LOG.error(message, ex);
throw new SQLException(message, ex);
}
else {
// just recover
LOG.info("recovered from DB inconsistency for id " + id + ": meta record was missing (impact will be minor performance degradation)");
}
}
try {
prep = con.prepareStatement("insert into " + this.tnMeta + " (ID, LVL, LASTMOD) values(?, ?, ?)");
try {
prep.setString(1, id);
prep.setInt(2, level);
prep.setLong(3, now);
int rows = prep.executeUpdate();
LOG.trace("insert-meta id={} rows={}", id, rows);
if (rows != 1) {
throw new SQLException("Insert of id " + id + " into " + this.tnMeta + " failed with result " + rows);
}
} finally {
prep.close();
}
} catch (SQLException e) {
// already exists - ok
LOG.debug("inserting meta record for id " + id, e);
}
}
} finally {
con.commit();
this.ch.closeConnection(con);
}
}
// needed in test
protected byte[] readBlockFromBackend(byte[] digest) throws Exception {
return readBlockFromBackend(StringUtils.convertBytesToHex(digest));
}
private byte[] readBlockFromBackend(String id) throws Exception {
Connection con = this.ch.getROConnection();
byte[] data;
try {
long pstart = PERFLOG.start(PERFLOG.isDebugEnabled() ? ("reading: " + id) : null);
PreparedStatement prep = con.prepareStatement("select DATA from " + this.tnData + " where ID = ?");
ResultSet rs = null;
try {
prep.setString(1, id);
rs = prep.executeQuery();
if (!rs.next()) {
PERFLOG.end(pstart, 10, "read: table={}, id={} -> not found", this.tnData, id);
throw new IOException("Datastore block " + id + " not found");
}
data = rs.getBytes(1);
PERFLOG.end(pstart, 10, "read: table={}, id={} -> data={}", this.tnData, id, (data == null ? 0 : data.length));
} catch (SQLException ex) {
PERFLOG.end(pstart, 10, "read: table={} -> exception={}", this.tnData, ex.getMessage());
throw ex;
} finally {
closeResultSet(rs);
closeStatement(prep);
}
} finally {
con.commit();
this.ch.closeConnection(con);
}
return data;
}
@Override
protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
String id = StringUtils.convertBytesToHex(blockId.getDigest());
byte[] data = cache.get(id);
if (data == null) {
long start = System.nanoTime();
data = readBlockFromBackend(id);
getStatsCollector().downloaded(id, System.nanoTime() - start, TimeUnit.NANOSECONDS, data.length);
cache.put(id, data);
}
if (blockId.getPos() == 0) {
return data;
}
int len = (int) (data.length - blockId.getPos());
if (len < 0) {
return new byte[0];
}
byte[] d2 = new byte[len];
System.arraycopy(data, (int) blockId.getPos(), d2, 0, len);
return d2;
}
@Override
public void startMark() throws IOException {
minLastModified = System.currentTimeMillis();
markInUse();
}
@Override
protected boolean isMarkEnabled() {
return minLastModified != 0;
}
@Override
protected void mark(BlockId blockId) throws Exception {
Connection con = this.ch.getRWConnection();
PreparedStatement prep = null;
try {
if (minLastModified == 0) {
return;
}
String id = StringUtils.convertBytesToHex(blockId.getDigest());
prep = con.prepareStatement("update " + this.tnMeta + " set LASTMOD = ? where ID = ? and LASTMOD < ?");
prep.setLong(1, System.currentTimeMillis());
prep.setString(2, id);
prep.setLong(3, minLastModified);
prep.executeUpdate();
prep.close();
} finally {
closeStatement(prep);
con.commit();
this.ch.closeConnection(con);
}
}
@Override
public int sweep() throws IOException {
try {
return sweepFromDatabase();
} catch (SQLException e) {
throw new IOException(e);
}
}
private int sweepFromDatabase() throws SQLException {
Connection con = this.ch.getRWConnection();
PreparedStatement prepCheck = null, prepDelMeta = null, prepDelData = null;
ResultSet rs = null;
try {
int count = 0;
prepCheck = con.prepareStatement("select ID from " + this.tnMeta + " where LASTMOD < ?");
prepCheck.setLong(1, minLastModified);
rs = prepCheck.executeQuery();
ArrayList<String> ids = new ArrayList<String>();
while (rs.next()) {
ids.add(rs.getString(1));
}
rs.close();
rs = null;
prepCheck.close();
prepCheck = null;
prepDelMeta = con.prepareStatement("delete from " + this.tnMeta + " where ID = ?");
prepDelData = con.prepareStatement("delete from " + this.tnData + " where ID = ?");
for (String id : ids) {
prepDelMeta.setString(1, id);
int mrows = prepDelMeta.executeUpdate();
LOG.trace("delete-meta id={} rows={}", id, mrows);
prepDelData.setString(1, id);
int drows = prepDelData.executeUpdate();
LOG.trace("delete-data id={} rows={}", id, drows);
count++;
}
prepDelMeta.close();
prepDelMeta = null;
prepDelData.close();
prepDelData = null;
minLastModified = 0;
return count;
} finally {
closeResultSet(rs);
closeStatement(prepCheck);
closeStatement(prepDelMeta);
closeStatement(prepDelData);
con.commit();
this.ch.closeConnection(con);
}
}
@Override
public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
long count = 0;
for (List<String> chunk : Lists.partition(chunkIds, RDBJDBCTools.MAX_IN_CLAUSE)) {
Connection con = this.ch.getRWConnection();
PreparedStatement prepMeta = null;
PreparedStatement prepData = null;
try {
PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", chunk, false);
StringBuilder metaStatement = new StringBuilder("delete from " + this.tnMeta + " where ")
.append(inClause.getStatementComponent());
StringBuilder dataStatement = new StringBuilder("delete from " + this.tnData + " where ")
.append(inClause.getStatementComponent());
if (maxLastModifiedTime > 0) {
// delete only if the last modified is OLDER than x
metaStatement.append(" and LASTMOD <= ?");
// delete if there is NO entry where the last modified of
// the meta is YOUNGER than x
dataStatement.append(" and not exists(select * from " + this.tnMeta + " where " + this.tnMeta + ".ID = "
+ this.tnData + ".ID and LASTMOD > ?)");
}
prepMeta = con.prepareStatement(metaStatement.toString());
prepData = con.prepareStatement(dataStatement.toString());
int mindex = 1, dindex = 1;
mindex = inClause.setParameters(prepMeta, mindex);
dindex = inClause.setParameters(prepData, dindex);
if (maxLastModifiedTime > 0) {
prepMeta.setLong(mindex, maxLastModifiedTime);
prepData.setLong(dindex, maxLastModifiedTime);
}
int deletedMeta = prepMeta.executeUpdate();
LOG.trace("delete-meta rows={}", deletedMeta);
int deletedData = prepData.executeUpdate();
LOG.trace("delete-data rows={}", deletedData);
if (deletedMeta != deletedData) {
String message = String.format(
"chunk deletion affected different numbers of DATA records (%s) and META records (%s)", deletedData,
deletedMeta);
LOG.info(message);
}
count += deletedMeta;
} finally {
closeStatement(prepMeta);
closeStatement(prepData);
con.commit();
this.ch.closeConnection(con);
}
}
return count;
}
@Override
public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
return new ChunkIdIterator(this.ch, maxLastModifiedTime, this.tnMeta);
}
/**
* Reads chunk IDs in batches.
*/
private static class ChunkIdIterator extends AbstractIterator<String> {
private long maxLastModifiedTime;
private RDBConnectionHandler ch;
private static int BATCHSIZE = 1024 * 64;
private List<String> results = new LinkedList<String>();
private String lastId = null;
private String metaTable;
public ChunkIdIterator(RDBConnectionHandler ch, long maxLastModifiedTime, String metaTable) {
this.maxLastModifiedTime = maxLastModifiedTime;
this.ch = ch;
this.metaTable = metaTable;
}
@Override
protected String computeNext() {
if (!results.isEmpty()) {
return results.remove(0);
} else {
// need to refill
if (refill()) {
return computeNext();
} else {
return endOfData();
}
}
}
private boolean refill() {
StringBuffer query = new StringBuffer();
query.append("select ID from " + metaTable);
if (maxLastModifiedTime > 0) {
query.append(" where LASTMOD <= ?");
if (lastId != null) {
query.append(" and ID > ?");
}
} else {
if (lastId != null) {
query.append(" where ID > ?");
}
}
query.append(" order by ID");
Connection connection = null;
try {
connection = this.ch.getROConnection();
PreparedStatement prep = null;
ResultSet rs = null;
try {
prep = connection.prepareStatement(query.toString());
int idx = 1;
if (maxLastModifiedTime > 0) {
prep.setLong(idx++, maxLastModifiedTime);
}
if (lastId != null) {
prep.setString(idx, lastId);
}
prep.setFetchSize(BATCHSIZE);
rs = prep.executeQuery();
while (rs.next()) {
lastId = rs.getString(1);
results.add(lastId);
}
rs.close();
rs = null;
return !results.isEmpty();
} finally {
closeResultSet(rs);
closeStatement(prep);
connection.commit();
this.ch.closeConnection(connection);
}
} catch (SQLException ex) {
LOG.debug("error executing ID lookup", ex);
this.ch.rollbackConnection(connection);
this.ch.closeConnection(connection);
return false;
}
}
}
}