| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.document.rdb; |
| |
| import static com.google.common.collect.Iterables.transform; |
| import static com.google.common.collect.Sets.newHashSet; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHAR2OCTETRATIO; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.asBytes; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.asDocumentStoreException; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.sql.BatchUpdateException; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| |
| import org.apache.jackrabbit.oak.commons.PerfLogger; |
| import org.apache.jackrabbit.oak.plugins.document.Document; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; |
| import org.apache.jackrabbit.oak.plugins.document.NodeDocument; |
| import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QueryCondition; |
| import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.RDBTableMetaData; |
| import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStoreDB.FETCHFIRSTSYNTAX; |
| import org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.PreparedStatementComponent; |
| import org.apache.jackrabbit.oak.plugins.document.util.UTF8Encoder; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Implements (most) DB interactions used in {@link RDBDocumentStore}. |
| */ |
| public class RDBDocumentStoreJDBC { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(RDBDocumentStoreJDBC.class); |
| private static final PerfLogger PERFLOG = new PerfLogger( |
| LoggerFactory.getLogger(RDBDocumentStoreJDBC.class.getName() + ".perf")); |
| |
| private static final String COLLISIONSMODCOUNT = RDBDocumentStore.COLLISIONSMODCOUNT; |
| private static final String MODCOUNT = NodeDocument.MOD_COUNT; |
| private static final String MODIFIED = NodeDocument.MODIFIED_IN_SECS; |
| |
| private static final int SCHEMAVERSION = RDBDocumentStore.SCHEMA; |
| |
| private final RDBDocumentStoreDB dbInfo; |
| private final RDBDocumentSerializer ser; |
| private final int queryHitsLimit, queryTimeLimit; |
| |
| private static final Long INITIALMODCOUNT = Long.valueOf(1); |
| |
| public RDBDocumentStoreJDBC(RDBDocumentStoreDB dbInfo, RDBDocumentSerializer ser, int queryHitsLimit, int queryTimeLimit) { |
| this.dbInfo = dbInfo; |
| this.ser = ser; |
| this.queryHitsLimit = queryHitsLimit; |
| this.queryTimeLimit = queryTimeLimit; |
| } |
| |
| public boolean appendingUpdate(Connection connection, RDBTableMetaData tmd, String id, Long modified, |
| boolean setModifiedConditionally, Number hasBinary, Boolean deletedOnce, Long modcount, Long cmodcount, |
| Long oldmodcount, String appendData) throws SQLException { |
| String appendDataWithComma = "," + appendData; |
| PreparedStatementComponent stringAppend = this.dbInfo.getConcatQuery(appendDataWithComma, tmd.getDataLimitInOctets()); |
| StringBuilder t = new StringBuilder(); |
| t.append("update " + tmd.getName() + " set "); |
| t.append(setModifiedConditionally ? "MODIFIED = case when ? > MODIFIED then ? else MODIFIED end, " : "MODIFIED = ?, "); |
| t.append("HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, "); |
| if (tmd.hasVersion()) { |
| t.append("VERSION = " + SCHEMAVERSION + ", "); |
| } |
| t.append("DATA = " + stringAppend.getStatementComponent() + " "); |
| t.append("where ID = ?"); |
| if (oldmodcount != null) { |
| t.append(" and MODCOUNT = ?"); |
| } |
| PreparedStatement stmt = connection.prepareStatement(t.toString()); |
| try { |
| int si = 1; |
| stmt.setObject(si++, modified, Types.BIGINT); |
| if (setModifiedConditionally) { |
| stmt.setObject(si++, modified, Types.BIGINT); |
| } |
| stmt.setObject(si++, hasBinaryAsNullOrInteger(hasBinary), Types.SMALLINT); |
| stmt.setObject(si++, deletedOnceAsNullOrInteger(deletedOnce), Types.SMALLINT); |
| stmt.setObject(si++, modcount, Types.BIGINT); |
| stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT); |
| stmt.setObject(si++, appendDataWithComma.length(), Types.BIGINT); |
| si = stringAppend.setParameters(stmt, si); |
| setIdInStatement(tmd, stmt, si++, id); |
| |
| if (oldmodcount != null) { |
| stmt.setObject(si++, oldmodcount, Types.BIGINT); |
| } |
| int result = stmt.executeUpdate(); |
| if (result != 1) { |
| LOG.debug("DB append update failed for " + tmd.getName() + "/" + id + " with oldmodcount=" + oldmodcount); |
| } |
| return result == 1; |
| } finally { |
| stmt.close(); |
| } |
| } |
| |
| public int delete(Connection connection, RDBTableMetaData tmd, List<String> allIds) throws SQLException { |
| int count = 0; |
| |
| for (List<String> ids : Lists.partition(allIds, RDBJDBCTools.MAX_IN_CLAUSE)) { |
| PreparedStatement stmt; |
| PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", ids, tmd.isIdBinary()); |
| String sql = "delete from " + tmd.getName() + " where " + inClause.getStatementComponent(); |
| stmt = connection.prepareStatement(sql); |
| |
| try { |
| inClause.setParameters(stmt, 1); |
| int result = stmt.executeUpdate(); |
| if (result != ids.size()) { |
| LOG.debug("DB delete failed for " + tmd.getName() + "/" + ids); |
| } |
| count += result; |
| } finally { |
| stmt.close(); |
| } |
| } |
| |
| return count; |
| } |
| |
| public int delete(Connection connection, RDBTableMetaData tmd, Map<String, Long> toDelete) |
| throws SQLException { |
| PreparedStatement stmt = connection.prepareStatement("delete from " + tmd.getName() + " where ID=? and MODIFIED=?"); |
| try { |
| for (Entry<String, Long> entry : toDelete.entrySet()) { |
| setIdInStatement(tmd, stmt, 1, entry.getKey()); |
| stmt.setLong(2, entry.getValue()); |
| stmt.addBatch(); |
| } |
| int[] rets = stmt.executeBatch(); |
| int updatedRows = 0; |
| for (int ret : rets) { |
| if (ret >= 0) { |
| updatedRows += ret; |
| } |
| } |
| return updatedRows; |
| } finally { |
| stmt.close(); |
| } |
| } |
| |
| public int deleteWithCondition(Connection connection, RDBTableMetaData tmd, List<QueryCondition> conditions) |
| throws SQLException, DocumentStoreException { |
| |
| StringBuilder query = new StringBuilder("delete from " + tmd.getName()); |
| |
| String whereClause = buildWhereClause(null, null, null, conditions); |
| if (whereClause.length() != 0) { |
| query.append(" where ").append(whereClause); |
| } |
| |
| PreparedStatement stmt = connection.prepareStatement(query.toString()); |
| try { |
| int si = 1; |
| for (QueryCondition cond : conditions) { |
| if (cond.getOperands().size() != 1) { |
| throw new DocumentStoreException("unexpected condition: " + cond); |
| } |
| stmt.setLong(si++, (Long)cond.getOperands().get(0)); |
| } |
| return stmt.executeUpdate(); |
| } finally { |
| stmt.close(); |
| } |
| } |
| |
| public long determineServerTimeDifferenceMillis(Connection connection) { |
| String sql = this.dbInfo.getCurrentTimeStampInSecondsSyntax(); |
| |
| if (sql.isEmpty()) { |
| LOG.debug("{}: unsupported database, skipping DB server time check", this.dbInfo.toString()); |
| return 0; |
| } else { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = connection.prepareStatement(sql); |
| long start = System.currentTimeMillis(); |
| rs = stmt.executeQuery(); |
| if (rs.next()) { |
| long roundtrip = System.currentTimeMillis() - start; |
| long serverTimeSec = rs.getInt(1); |
| long roundedTimeSec = ((start + roundtrip / 2) + 500) / 1000; |
| long resultSec = roundedTimeSec - serverTimeSec; |
| String message = String.format("instance timestamp: %d, DB timestamp: %d, difference: %d", roundedTimeSec, |
| serverTimeSec, resultSec); |
| if (Math.abs(resultSec) >= 2) { |
| LOG.info(message); |
| } else { |
| LOG.debug(message); |
| } |
| return resultSec * 1000; |
| } else { |
| throw new DocumentStoreException("failed to determine server timestamp"); |
| } |
| } catch (Exception ex) { |
| LOG.error("Trying to determine time difference to server", ex); |
| throw asDocumentStoreException(ex, "Trying to determine time difference to server"); |
| } finally { |
| closeResultSet(rs); |
| closeStatement(stmt); |
| } |
| } |
| } |
| |
| public <T extends Document> Set<String> insert(Connection connection, RDBTableMetaData tmd, List<T> documents) throws SQLException { |
| int actualSchema = tmd.hasSplitDocs() ? 2 : 1; |
| PreparedStatement stmt = connection.prepareStatement( |
| "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, " |
| + (tmd.hasVersion() ? "VERSION, " : "") |
| + (tmd.hasSplitDocs() ? "SDTYPE, SDMAXREVTIME, " : "") |
| + "DATA, BDATA) " + "values (?, ?, ?, ?, ?, ?, ?, " |
| + (tmd.hasVersion() ? (" " + actualSchema + ", ") : "") |
| + (tmd.hasSplitDocs() ? "?, ?, " : "") |
| + "?, ?)"); |
| |
| List<T> sortedDocs = sortDocuments(documents); |
| int[] results; |
| try { |
| for (T document : sortedDocs) { |
| String data = this.ser.asString(document, tmd.getColumnOnlyProperties()); |
| String id = document.getId(); |
| Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG); |
| Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE); |
| Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT); |
| |
| int si = 1; |
| setIdInStatement(tmd, stmt, si++, id); |
| stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT); |
| stmt.setObject(si++, hasBinaryAsNullOrInteger(hasBinary), Types.SMALLINT); |
| stmt.setObject(si++, deletedOnceAsNullOrInteger(deletedOnce), Types.SMALLINT); |
| stmt.setObject(si++, document.get(MODCOUNT), Types.BIGINT); |
| stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT); |
| stmt.setObject(si++, data.length(), Types.BIGINT); |
| if (tmd.hasSplitDocs()) { |
| stmt.setObject(si++, document.get(NodeDocument.SD_TYPE)); |
| stmt.setObject(si++, document.get(NodeDocument.SD_MAX_REV_TIME_IN_SECS)); |
| } |
| if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) { |
| setDataInStatement(tmd, stmt, si++, data); |
| stmt.setBinaryStream(si++, null, 0); |
| } else { |
| setDataInStatement(tmd, stmt, si++, "\"blob\""); |
| byte[] bytes = asBytes(data); |
| stmt.setBytes(si++, bytes); |
| } |
| stmt.addBatch(); |
| } |
| results = stmt.executeBatch(); |
| } catch (BatchUpdateException ex) { |
| LOG.debug("Some of the batch updates failed", ex); |
| results = ex.getUpdateCounts(); |
| } finally { |
| stmt.close(); |
| } |
| Set<String> succesfullyInserted = new HashSet<String>(); |
| for (int i = 0; i < results.length; i++) { |
| int result = results[i]; |
| if (result != 1 && result != Statement.SUCCESS_NO_INFO) { |
| LOG.debug("DB insert failed for {}: {}", tmd.getName(), sortedDocs.get(i).getId()); |
| } else { |
| succesfullyInserted.add(sortedDocs.get(i).getId()); |
| } |
| } |
| return succesfullyInserted; |
| } |
| |
| /** |
| * Update a list of documents using JDBC batches. Some of the updates may fail because of the concurrent |
| * changes. The method returns a set of successfully updated documents. It's the caller responsibility |
| * to compare the set with the list of input documents, find out which documents conflicted and take |
| * appropriate action. |
| * <p> |
| * If the {@code upsert} parameter is set to true, the method will also try to insert new documents, those |
| * which modcount equals to 1. |
| * <p> |
| * The order of applying updates will be different than order of the passed list, so there shouldn't be two |
| * updates related to the same document. An {@link IllegalArgumentException} will be thrown if there are. |
| * |
| * @param connection JDBC connection |
| * @param tmd Table metadata |
| * @param documents List of documents to update |
| * @param upsert Insert new documents |
| * @return set containing ids of successfully updated documents |
| * @throws SQLException |
| */ |
| public <T extends Document> Set<String> update(Connection connection, RDBTableMetaData tmd, List<T> documents, boolean upsert) |
| throws SQLException { |
| assertNoDuplicatedIds(documents); |
| |
| Set<String> successfulUpdates = new HashSet<String>(); |
| List<String> updatedKeys = new ArrayList<String>(); |
| List<Long> modCounts = LOG.isTraceEnabled() ? new ArrayList<>() : null; |
| int[] batchResults = new int[0]; |
| |
| PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName() |
| + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, " |
| + (tmd.hasVersion() ? (" VERSION = " + SCHEMAVERSION + ", ") : "") + "BDATA = ? where ID = ? and MODCOUNT = ?"); |
| try { |
| boolean batchIsEmpty = true; |
| for (T document : sortDocuments(documents)) { |
| Long modcount = (Long) document.get(MODCOUNT); |
| if (INITIALMODCOUNT.equals(modcount)) { |
| continue; // This is a new document. We'll deal with the inserts later. |
| } |
| |
| String data = this.ser.asString(document, tmd.getColumnOnlyProperties()); |
| Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG); |
| Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE); |
| Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT); |
| |
| int si = 1; |
| stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT); |
| stmt.setObject(si++, hasBinaryAsNullOrInteger(hasBinary), Types.SMALLINT); |
| stmt.setObject(si++, deletedOnceAsNullOrInteger(deletedOnce), Types.SMALLINT); |
| stmt.setObject(si++, modcount, Types.BIGINT); |
| stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT); |
| stmt.setObject(si++, data.length(), Types.BIGINT); |
| |
| if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) { |
| setDataInStatement(tmd, stmt, si++, data); |
| stmt.setBinaryStream(si++, null, 0); |
| } else { |
| setDataInStatement(tmd, stmt, si++, "\"blob\""); |
| byte[] bytes = asBytes(data); |
| stmt.setBytes(si++, bytes); |
| } |
| |
| setIdInStatement(tmd, stmt, si++, document.getId()); |
| stmt.setObject(si++, modcount - 1, Types.BIGINT); |
| stmt.addBatch(); |
| updatedKeys.add(document.getId()); |
| if (modCounts != null) { |
| modCounts.add(modcount); |
| } |
| |
| batchIsEmpty = false; |
| } |
| if (!batchIsEmpty) { |
| batchResults = stmt.executeBatch(); |
| connection.commit(); |
| } |
| } catch (BatchUpdateException ex) { |
| LOG.debug("Some of the batch updates failed", ex); |
| batchResults = ex.getUpdateCounts(); |
| } finally { |
| stmt.close(); |
| } |
| |
| if (!updatedKeys.isEmpty() && LOG.isTraceEnabled()) { |
| StringBuilder br = new StringBuilder(String.format("update: batch result on '%s' (sent: %d, received: %d):", tmd.getName(), |
| updatedKeys.size(), batchResults.length)); |
| String delim = " "; |
| for (int i = 0; i < batchResults.length; i++) { |
| br.append(delim).append(batchResults[i]); |
| if (i < updatedKeys.size()) { |
| br.append(String.format(" (for %s (%d))", updatedKeys.get(i), modCounts.get(i) - 1)); |
| } |
| delim = ", "; |
| } |
| LOG.trace(br.toString()); |
| } |
| |
| for (int i = 0; i < batchResults.length; i++) { |
| int result = batchResults[i]; |
| if (result == 1 || result == Statement.SUCCESS_NO_INFO) { |
| successfulUpdates.add(updatedKeys.get(i)); |
| } |
| } |
| |
| if (upsert) { |
| List<T> toBeInserted = new ArrayList<T>(documents.size()); |
| for (T doc : documents) { |
| if (INITIALMODCOUNT.equals(doc.get(MODCOUNT))) { |
| toBeInserted.add(doc); |
| } |
| } |
| |
| if (!toBeInserted.isEmpty()) { |
| for (String id : insert(connection, tmd, toBeInserted)) { |
| successfulUpdates.add(id); |
| } |
| } |
| } |
| return successfulUpdates; |
| } |
| |
| private static <T extends Document> void assertNoDuplicatedIds(List<T> documents) { |
| if (newHashSet(transform(documents, idExtractor)).size() < documents.size()) { |
| throw new IllegalArgumentException("There are duplicated ids in the document list"); |
| } |
| } |
| |
| @NotNull |
| public List<RDBRow> query(Connection connection, RDBTableMetaData tmd, String minId, String maxId, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit) throws SQLException { |
| long start = System.currentTimeMillis(); |
| List<RDBRow> result = new ArrayList<RDBRow>(); |
| long dataTotal = 0, bdataTotal = 0; |
| PreparedStatement stmt = null; |
| String fields; |
| if (tmd.hasSplitDocs()) { |
| fields = "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, SDTYPE, SDMAXREVTIME, DATA, BDATA"; |
| } else if (tmd.hasVersion()) { |
| fields = "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, DATA, BDATA"; |
| } else { |
| fields = "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA"; |
| } |
| ResultSet rs = null; |
| try { |
| long pstart = PERFLOG.start(PERFLOG.isDebugEnabled() |
| ? ("querying: table=" + tmd.getName() + ", minId=" + minId + ", maxId=" + maxId + ", excludeKeyPatterns=" |
| + excludeKeyPatterns + ", conditions=" + conditions + ", limit=" + limit) |
| : null); |
| stmt = prepareQuery(connection, tmd, fields, minId, |
| maxId, excludeKeyPatterns, conditions, limit, "ID"); |
| rs = stmt.executeQuery(); |
| while (rs.next() && result.size() < limit) { |
| int field = 1; |
| String id = getIdFromRS(tmd, rs, field++); |
| |
| if ((minId != null && id.compareTo(minId) < 0) || (maxId != null && id.compareTo(maxId) > 0)) { |
| throw new DocumentStoreException( |
| "unexpected query result: '" + minId + "' < '" + id + "' < '" + maxId + "' - broken DB collation?"); |
| } |
| long modified = readLongFromResultSet(rs, field++); |
| long modcount = readLongFromResultSet(rs, field++); |
| long cmodcount = readLongFromResultSet(rs, field++); |
| Long hasBinary = readLongOrNullFromResultSet(rs, field++); |
| Boolean deletedOnce = readBooleanOrNullFromResultSet(rs, field++); |
| long schemaVersion = tmd.hasVersion() ? readLongFromResultSet(rs, field++) : 0; |
| long sdType = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| long sdMaxRevTime = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| String data = rs.getString(field++); |
| byte[] bdata = rs.getBytes(field++); |
| result.add(new RDBRow(id, hasBinary, deletedOnce, modified, modcount, cmodcount, schemaVersion, sdType, |
| sdMaxRevTime, data, bdata)); |
| dataTotal += data == null ? 0 : data.length(); |
| bdataTotal += bdata == null ? 0 : bdata.length; |
| PERFLOG.end(pstart, 10, "queried: table={} -> id={}, modcount={}, modified={}, data={}, bdata={}", tmd.getName(), id, |
| modcount, modified, (data == null ? 0 : data.length()), (bdata == null ? 0 : bdata.length)); |
| } |
| } finally { |
| closeStatement(stmt); |
| closeResultSet(rs); |
| } |
| |
| long elapsed = System.currentTimeMillis() - start; |
| |
| if ((this.queryHitsLimit != 0 && result.size() > this.queryHitsLimit) |
| || (this.queryTimeLimit != 0 && elapsed > this.queryTimeLimit)) { |
| |
| String params = String.format("params minid '%s' maxid '%s' excludeKeyPatterns %s conditions %s limit %d.", minId, |
| maxId, excludeKeyPatterns, conditions, limit); |
| |
| String resultRange = ""; |
| if (result.size() > 0) { |
| resultRange = String.format(" Result range: '%s'...'%s'.", result.get(0).getId(), |
| result.get(result.size() - 1).getId()); |
| } |
| |
| String postfix = String.format(" Read %d chars from DATA and %d bytes from BDATA. Check calling method.", dataTotal, |
| bdataTotal); |
| |
| if (this.queryHitsLimit != 0 && result.size() > this.queryHitsLimit) { |
| String message = String.format( |
| "Potentially excessive query on %s with %d hits (limited to %d, configured QUERYHITSLIMIT %d), elapsed time %dms, %s%s%s", |
| tmd.getName(), result.size(), limit, this.queryHitsLimit, elapsed, params, resultRange, postfix); |
| LOG.info(message, new Exception("call stack")); |
| } |
| |
| if (this.queryTimeLimit != 0 && elapsed > this.queryTimeLimit) { |
| String message = String.format( |
| "Long running query on %s with %d hits (limited to %d), elapsed time %dms (configured QUERYTIMELIMIT %d), %s%s%s", |
| tmd.getName(), result.size(), limit, elapsed, this.queryTimeLimit, params, resultRange, postfix); |
| LOG.info(message, new Exception("call stack")); |
| } |
| } |
| |
| return result; |
| } |
| |
| public long getLong(Connection connection, RDBTableMetaData tmd, String aggregate, String field, String minId, String maxId, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions) throws SQLException { |
| PreparedStatement stmt = null; |
| ResultSet rs = null; |
| long start = System.currentTimeMillis(); |
| long result = -1; |
| String selector = aggregate + "(" + ("*".equals(field) ? "*" : INDEXED_PROP_MAPPING.get(field)) + ")"; |
| try { |
| stmt = prepareQuery(connection, tmd, selector, minId, maxId, excludeKeyPatterns, conditions, Integer.MAX_VALUE, null); |
| rs = stmt.executeQuery(); |
| |
| result = rs.next() ? rs.getLong(1) : -1; |
| return result; |
| } finally { |
| closeStatement(stmt); |
| closeResultSet(rs); |
| if (LOG.isDebugEnabled()) { |
| long elapsed = System.currentTimeMillis() - start; |
| String params = String.format("params minid '%s' maxid '%s' excludeKeyPatterns %s conditions %s.", minId, maxId, |
| excludeKeyPatterns, conditions); |
| LOG.debug("Aggregate query " + selector + " on " + tmd.getName() + " with " + params + " -> " + result + ", took " |
| + elapsed + "ms"); |
| } |
| } |
| } |
| |
| @NotNull |
| public Iterator<RDBRow> queryAsIterator(RDBConnectionHandler ch, RDBTableMetaData tmd, String minId, String maxId, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit, String sortBy) throws SQLException { |
| return new ResultSetIterator(ch, tmd, minId, maxId, excludeKeyPatterns, conditions, limit, sortBy); |
| } |
| |
| private class ResultSetIterator implements Iterator<RDBRow>, Closeable { |
| |
| private RDBConnectionHandler ch; |
| private Connection connection; |
| private RDBTableMetaData tmd; |
| private PreparedStatement stmt; |
| private ResultSet rs; |
| private RDBRow next = null; |
| private Exception callstack = null; |
| private long elapsed = 0; |
| private String message = null; |
| private long cnt = 0; |
| private long pstart; |
| |
| public ResultSetIterator(RDBConnectionHandler ch, RDBTableMetaData tmd, String minId, String maxId, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit, String sortBy) throws SQLException { |
| long start = System.currentTimeMillis(); |
| try { |
| this.ch = ch; |
| this.connection = ch.getROConnection(); |
| this.tmd = tmd; |
| String fields; |
| if (tmd.hasSplitDocs()) { |
| fields = "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, SDTYPE, SDMAXREVTIME, DATA, BDATA"; |
| } else if (tmd.hasVersion()) { |
| fields = "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, DATA, BDATA"; |
| } else { |
| fields = "ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA"; |
| } |
| this.stmt = prepareQuery(connection, tmd, fields, minId, maxId, excludeKeyPatterns, conditions, limit, sortBy); |
| this.rs = stmt.executeQuery(); |
| this.next = internalNext(); |
| this.message = String.format("Query on %s with params minid '%s' maxid '%s' excludeKeyPatterns %s conditions %s.", |
| tmd.getName(), minId, maxId, excludeKeyPatterns, conditions); |
| if (LOG.isDebugEnabled()) { |
| callstack = new Exception("call stack"); |
| } |
| pstart = PERFLOG.start(PERFLOG.isDebugEnabled() |
| ? ("querying: table=" + tmd.getName() + ", minId=" + minId + ", maxId=" + maxId + ", excludeKeyPatterns=" |
| + excludeKeyPatterns + ", conditions=" + conditions + ", limit=" + limit + ", sortBy=" + sortBy) |
| : null); |
| } finally { |
| this.elapsed += (System.currentTimeMillis() - start); |
| } |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return next != null; |
| } |
| |
| @Override |
| public void remove() { |
| throw new RuntimeException("remove not supported"); |
| } |
| |
| @Override |
| public RDBRow next() { |
| RDBRow result = next; |
| if (next != null) { |
| next = internalNext(); |
| this.cnt += 1; |
| return result; |
| } else { |
| throw new NoSuchElementException("ResultSet exhausted"); |
| } |
| } |
| |
| private RDBRow internalNext() { |
| long start = System.currentTimeMillis(); |
| try { |
| if (this.rs.next()) { |
| int field = 1; |
| String id = getIdFromRS(this.tmd, this.rs, field++); |
| long modified = readLongFromResultSet(this.rs, field++); |
| long modcount = readLongFromResultSet(this.rs, field++); |
| long cmodcount = readLongFromResultSet(this.rs, field++); |
| Long hasBinary = readLongOrNullFromResultSet(this.rs, field++); |
| Boolean deletedOnce = readBooleanOrNullFromResultSet(this.rs, field++); |
| long schemaVersion = tmd.hasVersion() ? readLongFromResultSet(rs, field++) : 0; |
| long sdType = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| long sdMaxRevTime = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| String data = this.rs.getString(field++); |
| byte[] bdata = this.rs.getBytes(field++); |
| PERFLOG.end(pstart, 10, "queried: table={} -> id={}, modcount={}, modified={}, data={}, bdata={}", |
| tmd.getName(), id, modcount, modified, (data == null ? 0 : data.length()), |
| (bdata == null ? 0 : bdata.length)); |
| return new RDBRow(id, hasBinary, deletedOnce, modified, modcount, cmodcount, schemaVersion, sdType, |
| sdMaxRevTime, data, bdata); |
| } else { |
| this.rs = closeResultSet(this.rs); |
| this.stmt = closeStatement(this.stmt); |
| this.connection.commit(); |
| internalClose(); |
| return null; |
| } |
| } catch (SQLException ex) { |
| LOG.debug("iterating through result set", ex); |
| throw new RuntimeException(ex); |
| } finally { |
| this.elapsed += (System.currentTimeMillis() - start); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| internalClose(); |
| } |
| |
| @Override |
| public void finalize() throws Throwable { |
| try { |
| if (this.connection != null) { |
| if (this.callstack != null) { |
| LOG.error("finalizing unclosed " + this + "; check caller", this.callstack); |
| } else { |
| LOG.error("finalizing unclosed " + this); |
| } |
| } |
| } finally { |
| super.finalize(); |
| } |
| } |
| |
| private void internalClose() { |
| this.rs = closeResultSet(this.rs); |
| this.stmt = closeStatement(this.stmt); |
| this.ch.closeConnection(this.connection); |
| this.connection = null; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(this.message + " -> " + this.cnt + " results in " + elapsed + "ms"); |
| } |
| } |
| } |
| |
| @NotNull |
| private PreparedStatement prepareQuery(Connection connection, RDBTableMetaData tmd, String columns, String minId, String maxId, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit, String sortBy) throws SQLException { |
| |
| StringBuilder selectClause = new StringBuilder(); |
| |
| if (limit != Integer.MAX_VALUE && this.dbInfo.getFetchFirstSyntax() == FETCHFIRSTSYNTAX.TOP) { |
| selectClause.append("TOP " + limit + " "); |
| } |
| |
| selectClause.append(columns + " from " + tmd.getName()); |
| |
| String whereClause = buildWhereClause(minId, maxId, excludeKeyPatterns, conditions); |
| |
| StringBuilder query = new StringBuilder(); |
| query.append("select ").append(selectClause); |
| |
| if (whereClause.length() != 0) { |
| query.append(" where ").append(whereClause); |
| } |
| |
| if (sortBy != null) { |
| query.append(" order by ID"); |
| } |
| |
| if (limit != Integer.MAX_VALUE) { |
| switch (this.dbInfo.getFetchFirstSyntax()) { |
| case LIMIT: |
| query.append(" LIMIT " + limit); |
| break; |
| case FETCHFIRST: |
| query.append(" FETCH FIRST " + limit + " ROWS ONLY"); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| PreparedStatement stmt = connection.prepareStatement(query.toString()); |
| |
| int si = 1; |
| if (minId != null) { |
| setIdInStatement(tmd, stmt, si++, minId); |
| } |
| if (maxId != null) { |
| setIdInStatement(tmd, stmt, si++, maxId); |
| } |
| for (String keyPattern : excludeKeyPatterns) { |
| setIdInStatement(tmd, stmt, si++, keyPattern); |
| } |
| for (QueryCondition cond : conditions) { |
| for (Object o : cond.getOperands()) { |
| stmt.setObject(si++, o); |
| } |
| } |
| if (limit != Integer.MAX_VALUE) { |
| stmt.setFetchSize(limit); |
| } |
| return stmt; |
| } |
| |
| public List<RDBRow> read(Connection connection, RDBTableMetaData tmd, Collection<String> allKeys) throws SQLException { |
| |
| List<RDBRow> rows = new ArrayList<RDBRow>(); |
| |
| for (List<String> keys : Iterables.partition(allKeys, RDBJDBCTools.MAX_IN_CLAUSE)) { |
| long pstart = PERFLOG.start(PERFLOG.isDebugEnabled() ? ("reading: " + keys) : null); |
| |
| PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", keys, tmd.isIdBinary()); |
| StringBuilder query = new StringBuilder(); |
| if (tmd.hasSplitDocs()) { |
| query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, SDTYPE, SDMAXREVTIME, DATA, BDATA from "); |
| } else if (tmd.hasVersion()) { |
| query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, DATA, BDATA from "); |
| } else { |
| query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from "); |
| } |
| query.append(tmd.getName()); |
| query.append(" where ").append(inClause.getStatementComponent()); |
| |
| PreparedStatement stmt = connection.prepareStatement(query.toString()); |
| ResultSet rs = null; |
| stmt.setPoolable(false); |
| try { |
| inClause.setParameters(stmt, 1); |
| rs = stmt.executeQuery(); |
| |
| while (rs.next()) { |
| int field = 1; |
| String id = getIdFromRS(tmd, rs, field++); |
| long modified = readLongFromResultSet(rs, field++); |
| long modcount = readLongFromResultSet(rs, field++); |
| long cmodcount = readLongFromResultSet(rs, field++); |
| Long hasBinary = readLongOrNullFromResultSet(rs, field++); |
| Boolean deletedOnce = readBooleanOrNullFromResultSet(rs, field++); |
| long schemaVersion = tmd.hasVersion() ? readLongFromResultSet(rs, field++) : 0; |
| long sdType = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| long sdMaxRevTime = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| String data = rs.getString(field++); |
| byte[] bdata = rs.getBytes(field++); |
| RDBRow row = new RDBRow(id, hasBinary, deletedOnce, modified, modcount, cmodcount, schemaVersion, sdType, |
| sdMaxRevTime, data, bdata); |
| rows.add(row); |
| PERFLOG.end(pstart, 10, "read: table={}, id={} -> modcount={}, modified={}, data={}, bdata={}", tmd.getName(), id, |
| modcount, modified, (data == null ? 0 : data.length()), (bdata == null ? 0 : bdata.length)); |
| } |
| } catch (SQLException ex) { |
| LOG.debug("attempting to read " + keys, ex); |
| PERFLOG.end(pstart, 10, "read: table={} -> exception={}", tmd.getName(), ex.getMessage()); |
| |
| // DB2 throws an SQLException for invalid keys; handle this more |
| // gracefully |
| if ("22001".equals(ex.getSQLState())) { |
| try { |
| connection.rollback(); |
| } catch (SQLException ex2) { |
| LOG.debug("failed to rollback", ex2); |
| } |
| return null; |
| } else { |
| throw (ex); |
| } |
| } finally { |
| closeResultSet(rs); |
| closeStatement(stmt); |
| } |
| } |
| return rows; |
| } |
| |
| @Nullable |
| public RDBRow read(Connection connection, RDBTableMetaData tmd, String id, long lastmodcount, long lastmodified) throws SQLException { |
| |
| long pstart = PERFLOG.start(); |
| |
| boolean useCaseStatement = lastmodcount != -1 && lastmodified >= 1; |
| StringBuffer sql = new StringBuffer(); |
| String fields; |
| if (tmd.hasSplitDocs()) { |
| fields = "MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, SDTYPE, SDMAXREVTIME, "; |
| } else if (tmd.hasVersion()) { |
| fields = "MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, VERSION, "; |
| } else { |
| fields = "MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, "; |
| } |
| |
| sql.append("select ").append(fields); |
| if (useCaseStatement) { |
| // the case statement causes the actual row data not to be |
| // sent in case we already have it |
| sql.append("case when (MODCOUNT = ? and MODIFIED = ?) then null else DATA end as DATA, "); |
| sql.append("case when (MODCOUNT = ? and MODIFIED = ?) then null else BDATA end as BDATA "); |
| } else { |
| // either we don't have a previous version of the document |
| // or the database does not support CASE in SELECT |
| sql.append("DATA, BDATA "); |
| } |
| sql.append("from " + tmd.getName() + " where ID = ?"); |
| PreparedStatement stmt = connection.prepareStatement(sql.toString()); |
| ResultSet rs = null; |
| |
| try { |
| int si = 1; |
| if (useCaseStatement) { |
| stmt.setLong(si++, lastmodcount); |
| stmt.setLong(si++, lastmodified); |
| stmt.setLong(si++, lastmodcount); |
| stmt.setLong(si++, lastmodified); |
| } |
| setIdInStatement(tmd, stmt, si, id); |
| |
| rs = stmt.executeQuery(); |
| if (rs.next()) { |
| int field = 1; |
| long modified = readLongFromResultSet(rs, field++); |
| long modcount = readLongFromResultSet(rs, field++); |
| long cmodcount = readLongFromResultSet(rs, field++); |
| Long hasBinary = readLongOrNullFromResultSet(rs, field++); |
| Boolean deletedOnce = readBooleanOrNullFromResultSet(rs, field++); |
| long schemaVersion = tmd.hasVersion() ? readLongFromResultSet(rs, field++) : 0; |
| long sdType = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| long sdMaxRevTime = tmd.hasSplitDocs() ? readLongFromResultSet(rs, field++) : RDBRow.LONG_UNSET; |
| String data = rs.getString(field++); |
| byte[] bdata = rs.getBytes(field++); |
| PERFLOG.end(pstart, 10, |
| "read: table={}, id={}, lastmodcount={}, lastmodified={} -> modcount={}, modified={}, data={}, bdata={}", |
| tmd.getName(), id, lastmodcount, lastmodified, modcount, modified, (data == null ? 0 : data.length()), |
| (bdata == null ? 0 : bdata.length)); |
| return new RDBRow(id, hasBinary, deletedOnce, modified, modcount, cmodcount, schemaVersion, sdType, sdMaxRevTime, |
| data, bdata); |
| } else { |
| PERFLOG.end(pstart, 10, "read: table={}, id={}, lastmodcount={}, lastmodified={} -> not found", tmd.getName(), id, |
| lastmodcount, lastmodified); |
| return null; |
| } |
| } catch (SQLException ex) { |
| LOG.debug("attempting to read " + id + " (id length is " + id.length() + ")", ex); |
| PERFLOG.end(pstart, 10, "read: table={}, id={}, lastmodcount={}, lastmodified={} -> exception={}", tmd.getName(), id, |
| lastmodcount, lastmodified, ex.getMessage()); |
| |
| // DB2 throws an SQLException for invalid keys; handle this more |
| // gracefully |
| if ("22001".equals(ex.getSQLState())) { |
| try { |
| connection.rollback(); |
| } catch (SQLException ex2) { |
| LOG.debug("failed to rollback", ex2); |
| } |
| return null; |
| } else { |
| throw (ex); |
| } |
| } finally { |
| closeResultSet(rs); |
| closeStatement(stmt); |
| } |
| } |
| |
| public boolean update(Connection connection, RDBTableMetaData tmd, String id, Long modified, Number hasBinary, |
| Boolean deletedOnce, Long modcount, Long cmodcount, Long oldmodcount, String data) throws SQLException { |
| |
| StringBuilder t = new StringBuilder(); |
| t.append("update " + tmd.getName() + " set "); |
| t.append("MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, " |
| + (tmd.hasVersion() ? (" VERSION = " + SCHEMAVERSION + ", ") : "") + "BDATA = ? "); |
| t.append("where ID = ?"); |
| if (oldmodcount != null) { |
| t.append(" and MODCOUNT = ?"); |
| } |
| PreparedStatement stmt = connection.prepareStatement(t.toString()); |
| try { |
| int si = 1; |
| stmt.setObject(si++, modified, Types.BIGINT); |
| stmt.setObject(si++, hasBinaryAsNullOrInteger(hasBinary), Types.SMALLINT); |
| stmt.setObject(si++, deletedOnceAsNullOrInteger(deletedOnce), Types.SMALLINT); |
| stmt.setObject(si++, modcount, Types.BIGINT); |
| stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT); |
| stmt.setObject(si++, data.length(), Types.BIGINT); |
| |
| if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) { |
| setDataInStatement(tmd, stmt, si++, data); |
| stmt.setBinaryStream(si++, null, 0); |
| } else { |
| setDataInStatement(tmd, stmt, si++, "\"blob\""); |
| byte[] bytes = asBytes(data); |
| stmt.setBytes(si++, bytes); |
| } |
| |
| setIdInStatement(tmd, stmt, si++, id); |
| |
| if (oldmodcount != null) { |
| stmt.setObject(si++, oldmodcount, Types.BIGINT); |
| } |
| int result = stmt.executeUpdate(); |
| if (result != 1) { |
| LOG.debug("DB update failed for " + tmd.getName() + "/" + id + " with oldmodcount=" + oldmodcount); |
| } |
| return result == 1; |
| } finally { |
| stmt.close(); |
| } |
| } |
| |
| private final static Map<String, String> INDEXED_PROP_MAPPING; |
| static { |
| Map<String, String> tmp = new HashMap<String, String>(); |
| tmp.put(MODIFIED, "MODIFIED"); |
| tmp.put(NodeDocument.HAS_BINARY_FLAG, "HASBINARY"); |
| tmp.put(NodeDocument.DELETED_ONCE, "DELETEDONCE"); |
| tmp.put(COLLISIONSMODCOUNT, "CMODCOUNT"); |
| tmp.put(NodeDocument.SD_TYPE, "SDTYPE"); |
| tmp.put(NodeDocument.SD_MAX_REV_TIME_IN_SECS, "SDMAXREVTIME"); |
| tmp.put(RDBDocumentStore.VERSIONPROP, "VERSION"); |
| INDEXED_PROP_MAPPING = Collections.unmodifiableMap(tmp); |
| } |
| |
| private final static Set<String> SUPPORTED_OPS; |
| static { |
| Set<String> tmp = new HashSet<String>(); |
| tmp.add(">="); |
| tmp.add(">"); |
| tmp.add("<="); |
| tmp.add("<"); |
| tmp.add("="); |
| tmp.add("in"); |
| tmp.add("is null"); |
| tmp.add("is not null"); |
| tmp.add("null or <"); |
| SUPPORTED_OPS = Collections.unmodifiableSet(tmp); |
| } |
| |
| private static String buildWhereClause(String minId, String maxId, List<String> excludeKeyPatterns, List<QueryCondition> conditions) { |
| StringBuilder result = new StringBuilder(); |
| |
| String whereSep = ""; |
| if (minId != null) { |
| result.append("ID > ?"); |
| whereSep = " and "; |
| } |
| if (maxId != null) { |
| result.append(whereSep).append("ID < ?"); |
| whereSep = " and "; |
| } |
| if (excludeKeyPatterns != null && !excludeKeyPatterns.isEmpty()) { |
| result.append(whereSep); |
| whereSep = " and "; |
| result.append("not ("); |
| for (int i = 0; i < excludeKeyPatterns.size(); i++) { |
| result.append(i == 0 ? "" : " or "); |
| result.append("ID like ?"); |
| } |
| result.append(")"); |
| } |
| for (QueryCondition cond : conditions) { |
| String op = cond.getOperator(); |
| if (!SUPPORTED_OPS.contains(op)) { |
| throw new DocumentStoreException("unsupported operator: " + op); |
| } |
| String indexedProperty = cond.getPropertyName(); |
| String column = INDEXED_PROP_MAPPING.get(indexedProperty); |
| if (column != null) { |
| String realOperand = op; |
| boolean allowNull = false; |
| if (op.startsWith("null or ")) { |
| realOperand = op.substring("null or ".length()); |
| allowNull = true; |
| } |
| result.append(whereSep); |
| if (allowNull) { |
| result.append("(").append(column).append(" is null or "); |
| } |
| result.append(column).append(" ").append(realOperand); |
| |
| List<? extends Object> operands = cond.getOperands(); |
| if (operands.size() == 1) { |
| result.append(" ?"); |
| } else if (operands.size() > 1) { |
| result.append(" ("); |
| for (int i = 0; i < operands.size(); i++) { |
| result.append("?"); |
| if (i < operands.size() - 1) { |
| result.append(", "); |
| } |
| } |
| result.append(") "); |
| } |
| if (allowNull) { |
| result.append(")"); |
| } |
| whereSep = " and "; |
| } else { |
| throw new DocumentStoreException("unsupported indexed property: " + indexedProperty); |
| } |
| } |
| return result.toString(); |
| } |
| |
| private static String getIdFromRS(RDBTableMetaData tmd, ResultSet rs, int idx) throws SQLException { |
| if (tmd.isIdBinary()) { |
| try { |
| return new String(rs.getBytes(idx), "UTF-8"); |
| } catch (UnsupportedEncodingException ex) { |
| LOG.error("UTF-8 not supported", ex); |
| throw asDocumentStoreException(ex, "UTF-8 not supported"); |
| } |
| } else { |
| return rs.getString(idx); |
| } |
| } |
| |
| private static void setIdInStatement(RDBTableMetaData tmd, PreparedStatement stmt, int idx, String id) throws SQLException { |
| try { |
| if (tmd.isIdBinary()) { |
| stmt.setBytes(idx, UTF8Encoder.encodeAsByteArray(id)); |
| } else { |
| if (!UTF8Encoder.canEncode(id)) { |
| throw new IOException("can not encode as UTF-8"); |
| } |
| stmt.setString(idx, id); |
| } |
| } catch (IOException ex) { |
| LOG.warn("Invalid ID: " + id, ex); |
| throw asDocumentStoreException(ex, "Invalid ID: " + id); |
| } |
| } |
| |
| private static void setDataInStatement(RDBTableMetaData tmd, PreparedStatement stmt, int idx, String id) throws SQLException { |
| if (tmd.isDataNChar()) { |
| stmt.setNString(idx, id); |
| } else { |
| stmt.setString(idx, id); |
| } |
| } |
| |
| private static long readLongFromResultSet(ResultSet res, int index) throws SQLException { |
| long v = res.getLong(index); |
| return res.wasNull() ? RDBRow.LONG_UNSET : v; |
| } |
| |
| @Nullable |
| private static Boolean readBooleanOrNullFromResultSet(ResultSet res, int index) throws SQLException { |
| long v = res.getLong(index); |
| return res.wasNull() ? null : Boolean.valueOf(v != 0); |
| } |
| |
| @Nullable |
| private static Long readLongOrNullFromResultSet(ResultSet res, int index) throws SQLException { |
| long v = res.getLong(index); |
| return res.wasNull() ? null : Long.valueOf(v); |
| } |
| |
| private static final Integer INT_FALSE = 0; |
| private static final Integer INT_TRUE = 1; |
| |
| @Nullable |
| private static Integer deletedOnceAsNullOrInteger(Boolean b) { |
| return b == null ? null : (b.booleanValue() ? INT_TRUE : INT_FALSE); |
| } |
| |
| @Nullable |
| private static Integer hasBinaryAsNullOrInteger(Number n) { |
| return n == null ? null : (n.longValue() == 1 ? INT_TRUE : INT_FALSE); |
| } |
| |
| private static <T extends Document> List<T> sortDocuments(Collection<T> documents) { |
| List<T> result = new ArrayList<T>(documents); |
| Collections.sort(result, new Comparator<T>() { |
| @Override |
| public int compare(T o1, T o2) { |
| return Strings.nullToEmpty(o1.getId()).compareTo(Strings.nullToEmpty(o2.getId())); |
| } |
| }); |
| return result; |
| } |
| |
| private static final Function<Document, String> idExtractor = new Function<Document, String>() { |
| @Override |
| public String apply(Document input) { |
| return input.getId(); |
| } |
| }; |
| } |