| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.document.rdb; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.collect.Lists.newArrayList; |
| import static com.google.common.collect.Lists.partition; |
| import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.asDocumentStoreException; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement; |
| import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.createTableName; |
| import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getModuleVersion; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.UnsupportedEncodingException; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Types; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| import java.util.zip.Deflater; |
| import java.util.zip.GZIPOutputStream; |
| |
| import javax.sql.DataSource; |
| |
| import org.apache.jackrabbit.oak.cache.CacheStats; |
| import org.apache.jackrabbit.oak.cache.CacheValue; |
| import org.apache.jackrabbit.oak.plugins.document.Collection; |
| import org.apache.jackrabbit.oak.plugins.document.Document; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStore; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; |
| import org.apache.jackrabbit.oak.plugins.document.DocumentStoreStatsCollector; |
| import org.apache.jackrabbit.oak.plugins.document.NodeDocument; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; |
| import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; |
| import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker; |
| import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; |
| import org.apache.jackrabbit.oak.plugins.document.cache.ModificationStamp; |
| import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; |
| import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks; |
| import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks; |
| import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; |
| import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterator; |
| import org.apache.jackrabbit.oak.plugins.document.util.SystemPropertySupplier; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Implementation of {@link DocumentStore} for relational databases. |
| * |
| * <h3 id="apidocs.supported-databases">Supported Databases</h3> |
| * <p> |
| * The code is supposed to be sufficiently generic to run with a variety of |
| * database implementations. However, the tables are created when required to |
| * simplify testing, and <em>that</em> code specifically supports these |
| * databases: |
| * <ul> |
| * <li>H2DB</li> |
| * <li>Apache Derby</li> |
| * <li>IBM DB2</li> |
| * <li>PostgreSQL</li> |
| * <li>MariaDB (MySQL)</li> |
| * <li>Microsoft SQL Server</li> |
| * <li>Oracle</li> |
| * </ul> |
| * |
| * <h3 id="apidocs.table-layout">Table Layout</h3> |
| * <p> |
| * Data for each of the DocumentStore's {@link Collection}s is stored in its own |
| * database table (with a name matching the collection). |
| * <p> |
| * The tables essentially implement key/value storage, where the key usually is |
| * derived from an Oak path, and the value is a serialization of a |
| * {@link Document} (or a part of one). Additional fields are used for queries, |
| * debugging, and concurrency control: |
| * <table style="text-align: left;"> |
| * <caption></caption> |
| * <thead> |
| * <tr> |
| * <th>Column</th> |
| * <th>Type</th> |
| * <th>Description</th> |
| * </tr> |
| * </thead> <tbody> |
| * <tr> |
| * <th>ID</th> |
| * <td>varchar(512) not null primary key</td> |
| * <td>The document's key (for databases that can not handle 512 character |
| * primary keys, such as MySQL, varbinary is possible as well).</td> |
| * </tr> |
| * <tr> |
| * <th>MODIFIED</th> |
| * <td>bigint</td> |
| * <td>Low-resolution timestamp. |
| * </tr> |
| * <tr> |
| * <th>HASBINARY</th> |
| * <td>smallint</td> |
| * <td>Flag indicating whether the document has binary properties. |
| * </tr> |
| * <tr> |
| * <th>DELETEDONCE</th> |
| * <td>smallint</td> |
| * <td>Flag indicating whether the document has been deleted once. |
| * </tr> |
| * <tr> |
| * <th>MODCOUNT</th> |
| * <td>bigint</td> |
| * <td>Modification counter, used for avoiding overlapping updates.</td> |
| * </tr> |
| * <tr> |
| * <th>DSIZE</th> |
| * <td>bigint</td> |
| * <td>The approximate size of the document's JSON serialization (for debugging |
| * purposes).</td> |
| * </tr> |
| * <tr> |
| * <th>VERSION</th> |
| * <td>smallint</td> |
| * <td>The schema version the code writing to a row (or inserting it) was aware |
| * of (introduced with schema version 1). Not set for rows written by version 0 |
| * client code.</td> |
| * </tr> |
| * <tr> |
| * <th>SDTYPE</th> |
| * <td>smallint</td> |
| * <td>Split Document type.</td> |
| * </tr> |
| * <tr> |
| * <th>SDMAXREVTIME</th> |
| * <td>bigint</td> |
| * <td>Split document max revision time..</td> |
| * </tr> |
| * <tr> |
| * <th>DATA</th> |
| * <td>varchar(16384)</td> |
| * <td>The document's JSON serialization (only used for small document sizes, in |
| * which case BDATA (below) is not set), or a sequence of JSON serialized update |
| * operations to be applied against the last full serialization.</td> |
| * </tr> |
| * <tr> |
| * <th>BDATA</th> |
| * <td>blob</td> |
| * <td>The document's JSON serialization (usually GZIPped, only used for "large" |
| * documents).</td> |
| * </tr> |
| * </tbody> |
| * </table> |
| * <p> |
| * The names of database tables can be prefixed; the purpose is mainly for |
| * testing, as tables can also be dropped automatically when the store is |
| * disposed (this only happens for those tables that have been created on |
| * demand). |
| * <h4 id="apidocs.versioning">Versioning</h4> |
| * <p> |
| * The initial database layout used in OAK 1.0 through 1.6 is version 0. |
| * <p> |
| * Version 1 introduces an additional "version" column, which records the schema |
| * version of the code writing to the database (upon insert and update). This is |
| * in preparation of future layout changes which might introduce new columns. |
| * <p> |
| * Version 2 introduces an additional "sdtype" and "sdmaxrevtime". |
| * <p> |
| * The code deals with both version 0, version 1 and version 2 table layouts. By |
| * default, it tries to create version 2 tables, and also tries to upgrade |
| * existing version 0 and 1 tables to version 2. |
| * <h4>DB-specific information</h4> |
| * <p> |
| * Databases need to be configured so that: |
| * <ul> |
| * <li>Text fields support all Unicode code points,</li> |
| * <li>Collation of text fields happens by Unicode code point,</li> |
| * <li>and BLOBs need to support at least 16 MB.</li> |
| * </ul> |
| * <p> |
| * See the |
| * <a href="https://jackrabbit.apache.org/oak/docs/nodestore/document/rdb-document-store.html#database-creation">RDBDocumentStore documentation</a> |
| * for more information. |
| * <h3 id="apidocs.table-creation">Table Creation</h3> |
| * <p> |
| * The code tries to create the tables when they are not present. Likewise, it |
| * tries to upgrade to a newer schema when needed. |
| * <p> |
| * Users/Administrators who prefer to stay in control over table generation can |
| * create them "manually". The oak-run "<a href="https://jackrabbit.apache.org/oak/docs/nodestore/document/rdb-document-store.html#rdbddldump"><code>rdbddldump</code></a>" |
| * command can be used to print out the DDL statements that would have been used for auto-creation |
| * and/or automatic schema updates. |
| * |
| * <h3>Caching</h3> |
| * <p> |
| * The cache borrows heavily from the {@link MongoDocumentStore} implementation. |
| * |
| * <h3>Queries</h3> |
| * <p> |
| * The implementation currently supports only three indexed properties: "_bin", |
| * "deletedOnce", and "_modified". Attempts to use a different indexed property |
| * will cause a {@link DocumentStoreException}. |
| */ |
| public class RDBDocumentStore implements DocumentStore { |
| |
| /** |
| * Creates a {@linkplain RDBDocumentStore} instance using the provided |
| * {@link DataSource}, {@link DocumentNodeStoreBuilder}, and {@link RDBOptions}. |
| */ |
| public RDBDocumentStore(DataSource ds, DocumentNodeStoreBuilder<?> builder, RDBOptions options) { |
| try { |
| initialize(ds, builder, options); |
| } catch (Exception ex) { |
| throw asDocumentStoreException(ex, "initializing RDB document store"); |
| } |
| } |
| |
| /** |
| * Creates a {@linkplain RDBDocumentStore} instance using the provided |
| * {@link DataSource}, {@link DocumentNodeStoreBuilder}, and default |
| * {@link RDBOptions}. |
| */ |
| public RDBDocumentStore(DataSource ds, DocumentNodeStoreBuilder<?> builder) { |
| this(ds, builder, new RDBOptions()); |
| } |
| |
| @Override |
| public <T extends Document> T find(Collection<T> collection, String id) { |
| return find(collection, id, Integer.MAX_VALUE); |
| } |
| |
| @Override |
| public <T extends Document> T find(final Collection<T> collection, final String id, int maxCacheAge) { |
| return readDocumentCached(collection, id, maxCacheAge); |
| } |
| |
| @NotNull |
| @Override |
| public <T extends Document> List<T> query(Collection<T> collection, String fromKey, String toKey, int limit) { |
| return query(collection, fromKey, toKey, null, 0, limit); |
| } |
| |
| @NotNull |
| @Override |
| public <T extends Document> List<T> query(Collection<T> collection, String fromKey, String toKey, String indexedProperty, |
| long startValue, int limit) { |
| List<QueryCondition> conditions = Collections.emptyList(); |
| if (indexedProperty != null) { |
| conditions = Collections.singletonList(new QueryCondition(indexedProperty, ">=", startValue)); |
| } |
| return internalQuery(collection, fromKey, toKey, EMPTY_KEY_PATTERN, conditions, limit); |
| } |
| |
| @NotNull |
| protected <T extends Document> List<T> query(Collection<T> collection, String fromKey, String toKey, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit) { |
| return internalQuery(collection, fromKey, toKey, excludeKeyPatterns, conditions, limit); |
| } |
| |
| @Override |
| public <T extends Document> void remove(Collection<T> collection, String id) { |
| try { |
| delete(collection, id); |
| } finally { |
| invalidateCache(collection, id, true); |
| } |
| } |
| |
| @Override |
| public <T extends Document> void remove(Collection<T> collection, List<String> ids) { |
| try { |
| delete(collection, ids); |
| } finally { |
| for (String id : ids) { |
| invalidateCache(collection, id, true); |
| } |
| } |
| } |
| |
| @Override |
| public <T extends Document> int remove(Collection<T> collection, Map<String, Long> toRemove) { |
| try { |
| return delete(collection, toRemove); |
| } finally { |
| for (String id : toRemove.keySet()) { |
| invalidateCache(collection, id, true); |
| } |
| } |
| } |
| |
| @Override |
| public <T extends Document> int remove(Collection<T> collection, String indexedProperty, long startValue, long endValue) |
| throws DocumentStoreException { |
| try { |
| List<QueryCondition> conditions = new ArrayList<QueryCondition>(); |
| conditions.add(new QueryCondition(indexedProperty, ">", startValue)); |
| conditions.add(new QueryCondition(indexedProperty, "<", endValue)); |
| return deleteWithCondition(collection, conditions); |
| } finally { |
| if (collection == Collection.NODES) { |
| // this method is currently being used only for Journal |
| // collection while GC. But, to keep sanctity of the API, we |
| // need to acknowledge that Nodes collection could've been used. |
| // But, in this signature, there's no useful way to invalidate |
| // cache. |
| // So, we use the hammer for this task |
| invalidateCache(); |
| } |
| } |
| } |
| |
| @Override |
| public <T extends Document> boolean create(Collection<T> collection, List<UpdateOp> updateOps) { |
| return internalCreate(collection, updateOps); |
| } |
| |
| @Override |
| public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update) { |
| UpdateUtils.assertUnconditional(update); |
| return internalCreateOrUpdate(collection, update, update.isNew(), false, RETRIES); |
| } |
| |
| @Override |
| public <T extends Document> List<T> createOrUpdate(Collection<T> collection, List<UpdateOp> updateOps) { |
| // fall back to sequential mode if batches are turned off using system |
| // property, or the number of update operations is small |
| if (!BATCHUPDATES || updateOps.size() < MINIMALBULKUPDATESIZE) { |
| List<T> results = new ArrayList<T>(updateOps.size()); |
| for (UpdateOp update : updateOps) { |
| results.add(createOrUpdate(collection, update)); |
| } |
| return results; |
| } else { |
| return internalCreateOrUpdate(collection, updateOps); |
| } |
| } |
| |
| private static int MINIMALBULKUPDATESIZE = 3; |
| |
| private <T extends Document> List<T> internalCreateOrUpdate(Collection<T> collection, List<UpdateOp> updateOps) { |
| final Stopwatch watch = startWatch(); |
| Map<UpdateOp, T> results = new LinkedHashMap<UpdateOp, T>(); |
| Map<String, UpdateOp> operationsToCover = new LinkedHashMap<String, UpdateOp>(); |
| Set<UpdateOp> duplicates = new HashSet<UpdateOp>(); |
| |
| for (UpdateOp updateOp : updateOps) { |
| UpdateUtils.assertUnconditional(updateOp); |
| if (operationsToCover.containsKey(updateOp.getId())) { |
| duplicates.add(updateOp); |
| results.put(updateOp, null); |
| } else { |
| UpdateOp clone = updateOp.copy(); |
| addUpdateCounters(clone); |
| operationsToCover.put(clone.getId(), clone); |
| results.put(clone, null); |
| } |
| } |
| |
| Map<String, T> oldDocs = new HashMap<String, T>(); |
| if (collection == Collection.NODES) { |
| oldDocs.putAll(readDocumentCached(collection, operationsToCover.keySet())); |
| } |
| |
| int i = 0; // iteration count |
| |
| while (operationsToCover.size() >= MINIMALBULKUPDATESIZE) { |
| // We should try to insert documents only during the first |
| // iteration. In the 2nd and 3rd iterations we only deal with |
| // conflicting documents, so they already exist in the database |
| // and there's no point in inserting them. |
| boolean upsert = i == 0; |
| |
| if (i++ == 3) { |
| // operations that conflicted in 3 consecutive bulk |
| // updates should be applied sequentially |
| break; |
| } |
| |
| for (List<UpdateOp> partition : partition(newArrayList(operationsToCover.values()), CHUNKSIZE)) { |
| Map<UpdateOp, T> successfulUpdates = bulkUpdate(collection, partition, oldDocs, upsert); |
| results.putAll(successfulUpdates); |
| operationsToCover.values().removeAll(successfulUpdates.keySet()); |
| } |
| } |
| |
| // if there are some changes left, we'll apply them one after another |
| for (UpdateOp updateOp : updateOps) { |
| UpdateOp conflictedOp = operationsToCover.remove(updateOp.getId()); |
| if (conflictedOp != null) { |
| if (collection == Collection.NODES) { |
| LOG.debug("createOrUpdate: update conflict on {}, invalidating cache and retrying...", updateOp.getId()); |
| nodesCache.invalidate(updateOp.getId()); |
| } else { |
| LOG.debug("createOrUpdate: update conflict on {}, retrying...", updateOp.getId()); |
| } |
| results.put(conflictedOp, createOrUpdate(collection, updateOp)); |
| } else if (duplicates.contains(updateOp)) { |
| results.put(updateOp, createOrUpdate(collection, updateOp)); |
| } |
| } |
| stats.doneCreateOrUpdate(watch.elapsed(TimeUnit.NANOSECONDS), |
| collection, Lists.transform(updateOps, new Function<UpdateOp, String>() { |
| @Override |
| public String apply(UpdateOp input) { |
| return input.getId(); |
| } |
| })); |
| return new ArrayList<T>(results.values()); |
| } |
| |
| private <T extends Document> Map<String, T> readDocumentCached(Collection<T> collection, Set<String> keys) { |
| Map<String, T> documents = new HashMap<String, T>(); |
| |
| if (collection == Collection.NODES) { |
| for (String key : keys) { |
| NodeDocument cached = nodesCache.getIfPresent(key); |
| if (cached != null && cached != NodeDocument.NULL) { |
| T doc = castAsT(unwrap(cached)); |
| documents.put(doc.getId(), doc); |
| } |
| } |
| } |
| |
| Set<String> documentsToRead = Sets.difference(keys, documents.keySet()); |
| Map<String, T> readDocuments = readDocumentsUncached(collection, documentsToRead); |
| documents.putAll(readDocuments); |
| |
| if (collection == Collection.NODES) { |
| for (T doc : readDocuments.values()) { |
| nodesCache.putIfAbsent((NodeDocument) doc); |
| } |
| } |
| |
| return documents; |
| } |
| |
| private <T extends Document> Map<String, T> readDocumentsUncached(Collection<T> collection, Set<String> keys) { |
| Map<String, T> result = new HashMap<String, T>(); |
| |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| try { |
| connection = this.ch.getROConnection(); |
| List<RDBRow> rows = db.read(connection, tmd, keys); |
| |
| int size = rows.size(); |
| for (int i = 0; i < size; i++) { |
| RDBRow row = rows.set(i, null); |
| T document = convertFromDBObject(collection, row); |
| result.put(document.getId(), document); |
| } |
| connection.commit(); |
| } catch (Exception ex) { |
| throw asDocumentStoreException(ex, "trying to read: " + keys); |
| } finally { |
| this.ch.closeConnection(connection); |
| } |
| return result; |
| } |
| |
| @Nullable |
| private <T extends Document> CacheChangesTracker obtainTracker(Collection<T> collection, Set<String> keys) { |
| if (collection == Collection.NODES) { |
| return this.nodesCache.registerTracker(keys); |
| } else { |
| return null; |
| } |
| } |
| |
| @Nullable |
| private <T extends Document> CacheChangesTracker obtainTracker(Collection<T> collection, String fromKey, String toKey) { |
| if (collection == Collection.NODES) { |
| return this.nodesCache.registerTracker(fromKey, toKey); |
| } else { |
| return null; |
| } |
| } |
| |
| private <T extends Document> Map<UpdateOp, T> bulkUpdate(Collection<T> collection, List<UpdateOp> updates, Map<String, T> oldDocs, boolean upsert) { |
| Set<String> missingDocs = new HashSet<String>(); |
| for (UpdateOp op : updates) { |
| if (!oldDocs.containsKey(op.getId())) { |
| missingDocs.add(op.getId()); |
| } |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("bulkUpdate: cached docs to be updated: {}", dumpKeysAndModcounts(oldDocs)); |
| } |
| |
| Map<String, T> freshDocs = readDocumentsUncached(collection, missingDocs); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("bulkUpdate: fresh docs to be updated: {}", dumpKeysAndModcounts(freshDocs)); |
| } |
| oldDocs.putAll(freshDocs); |
| |
| try (CacheChangesTracker tracker = obtainTracker(collection, Sets.union(oldDocs.keySet(), missingDocs) )) { |
| List<T> docsToUpdate = new ArrayList<T>(updates.size()); |
| Set<String> keysToUpdate = new HashSet<String>(); |
| for (UpdateOp update : updates) { |
| String id = update.getId(); |
| T modifiedDoc = collection.newDocument(this); |
| T oldDoc = oldDocs.get(id); |
| if (oldDoc != null) { |
| oldDoc.deepCopy(modifiedDoc); |
| } |
| UpdateUtils.applyChanges(modifiedDoc, update); |
| if (oldDoc != null || update.isNew()) { |
| // only create if updateOp allows it |
| docsToUpdate.add(modifiedDoc); |
| } |
| keysToUpdate.add(id); |
| } |
| |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| try { |
| connection = this.ch.getRWConnection(); |
| Set<String> successfulUpdates = db.update(connection, tmd, docsToUpdate, upsert); |
| connection.commit(); |
| |
| Set<String> failedUpdates = Sets.difference(keysToUpdate, successfulUpdates); |
| oldDocs.keySet().removeAll(failedUpdates); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("bulkUpdate: success for {}, failure for {}", successfulUpdates, failedUpdates); |
| } |
| |
| if (collection == Collection.NODES) { |
| List<NodeDocument> docsToCache = new ArrayList<>(); |
| for (T doc : docsToUpdate) { |
| if (successfulUpdates.contains(doc.getId())) { |
| docsToCache.add((NodeDocument) doc); |
| } |
| } |
| nodesCache.putNonConflictingDocs(tracker, docsToCache); |
| } |
| |
| Map<UpdateOp, T> result = new HashMap<UpdateOp, T>(); |
| for (UpdateOp op : updates) { |
| if (successfulUpdates.contains(op.getId())) { |
| result.put(op, oldDocs.get(op.getId())); |
| } |
| } |
| return result; |
| } catch (SQLException ex) { |
| this.ch.rollbackConnection(connection); |
| throw handleException("update failed for: " + keysToUpdate, ex, collection, keysToUpdate); |
| } finally { |
| this.ch.closeConnection(connection); |
| } |
| } |
| } |
| |
| @Override |
| public <T extends Document> T findAndUpdate(Collection<T> collection, UpdateOp update) { |
| return internalCreateOrUpdate(collection, update, false, true, RETRIES); |
| } |
| |
| @Override |
| public CacheInvalidationStats invalidateCache() { |
| for (CacheValue key : nodesCache.keys()) { |
| invalidateCache(Collection.NODES, key.toString()); |
| } |
| return null; |
| } |
| |
| @Override |
| public CacheInvalidationStats invalidateCache(Iterable<String> keys) { |
| for (String key : keys) { |
| invalidateCache(Collection.NODES, key); |
| } |
| return null; |
| } |
| |
| @Override |
| public <T extends Document> void invalidateCache(Collection<T> collection, String id) { |
| invalidateCache(collection, id, false); |
| } |
| |
| private <T extends Document> void invalidateCache(Collection<T> collection, String id, boolean remove) { |
| if (collection == Collection.NODES) { |
| invalidateNodesCache(id, remove); |
| } |
| } |
| |
| private void invalidateNodesCache(String id, boolean remove) { |
| try (CacheLock lock = acquireLockFor(id)) { |
| if (remove) { |
| nodesCache.invalidate(id); |
| } else { |
| nodesCache.markChanged(id); |
| NodeDocument entry = nodesCache.getIfPresent(id); |
| if (entry != null) { |
| entry.markUpToDate(0); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public long determineServerTimeDifferenceMillis() { |
| Connection connection = null; |
| try { |
| connection = this.ch.getROConnection(); |
| long result = this.db.determineServerTimeDifferenceMillis(connection); |
| connection.commit(); |
| return result; |
| } catch (SQLException ex) { |
| LOG.error("Trying to determine time difference to server", ex); |
| throw asDocumentStoreException(ex, "Trying to determine time difference to server"); |
| } finally { |
| this.ch.closeConnection(connection); |
| } |
| } |
| |
| // used for diagnostics |
| private String droppedTables = ""; |
| |
| public String getDroppedTables() { |
| return this.droppedTables; |
| } |
| |
| // table names |
| private static Map<Collection<? extends Document>, String> TABLEMAP; |
| private static List<String> TABLENAMES; |
| static { |
| Map<Collection<? extends Document>, String> tmp = new HashMap<Collection<? extends Document>, String>(); |
| tmp.put(Collection.CLUSTER_NODES, "CLUSTERNODES"); |
| tmp.put(Collection.JOURNAL, "JOURNAL"); |
| tmp.put(Collection.NODES, "NODES"); |
| tmp.put(Collection.SETTINGS, "SETTINGS"); |
| TABLEMAP = Collections.unmodifiableMap(tmp); |
| List<String> tl = new ArrayList<String>(TABLEMAP.values()); |
| Collections.sort(tl); |
| TABLENAMES = Collections.unmodifiableList(tl); |
| } |
| |
| public static List<String> getTableNames() { |
| return TABLENAMES; |
| } |
| |
| /** |
| * Holds the data about a table that can vary: name, whether the primary key |
| * is binary, and the estimated size of the "data" column. |
| */ |
| static class RDBTableMetaData { |
| |
| private final String catalog; |
| private final String name; |
| private boolean idIsBinary = false; |
| private boolean dataIsNChar = false; |
| private boolean hasVersion = false; |
| private boolean hasSplitDocs = false; |
| private int dataLimitInOctets = 16384; |
| private String schemaInfo = ""; |
| private String indexInfo = ""; |
| private Set<String> columnOnlyProperties = Collections.unmodifiableSet(COLUMNPROPERTIES); |
| private Set<String> columnProperties = Collections.unmodifiableSet(COLUMNPROPERTIES); |
| |
| public RDBTableMetaData(@Nullable String catalog, @NotNull String name) { |
| this.catalog = catalog == null ? "" : catalog; |
| this.name = name; |
| } |
| |
| public int getDataLimitInOctets() { |
| return this.dataLimitInOctets; |
| } |
| |
| public String getCatalog() { |
| return this.catalog; |
| } |
| |
| public Set<String> getColumnProperties() { |
| return this.columnProperties; |
| } |
| |
| public Set<String> getColumnOnlyProperties() { |
| return this.columnOnlyProperties; |
| } |
| |
| public String getIndexInfo() { |
| return this.indexInfo; |
| } |
| |
| public String getName() { |
| return this.name; |
| } |
| |
| public String getSchemaInfo() { |
| return this.schemaInfo; |
| } |
| |
| public boolean isDataNChar() { |
| return this.dataIsNChar; |
| } |
| |
| public boolean isIdBinary() { |
| return this.idIsBinary; |
| } |
| |
| public boolean hasSplitDocs() { |
| return this.hasSplitDocs; |
| } |
| |
| public boolean hasVersion() { |
| return this.hasVersion; |
| } |
| |
| public void setDataIsNChar(boolean dataIsNChar) { |
| this.dataIsNChar = dataIsNChar; |
| } |
| |
| public void setIdIsBinary(boolean idIsBinary) { |
| this.idIsBinary = idIsBinary; |
| } |
| |
| public void setHasSplitDocs(boolean hasSplitDocs) { |
| this.hasSplitDocs = hasSplitDocs; |
| this.columnProperties = Collections.unmodifiableSet(hasSplitDocs ? COLUMNPROPERTIES2 : COLUMNPROPERTIES) ; |
| } |
| |
| public void setHasVersion(boolean hasVersion) { |
| this.hasVersion = hasVersion; |
| } |
| |
| public void setDataLimitInOctets(int dataLimitInOctets) { |
| this.dataLimitInOctets = dataLimitInOctets; |
| } |
| |
| public void setSchemaInfo(String schemaInfo) { |
| this.schemaInfo = schemaInfo; |
| } |
| |
| public void setIndexInfo(String indexInfo) { |
| this.indexInfo = indexInfo; |
| } |
| } |
| |
| private final Map<Collection<? extends Document>, RDBTableMetaData> tableMeta = new HashMap<Collection<? extends Document>, RDBTableMetaData>(); |
| |
| @Override |
| public void dispose() { |
| if (!this.tablesToBeDropped.isEmpty()) { |
| String dropped = ""; |
| LOG.debug("attempting to drop: " + this.tablesToBeDropped); |
| for (String tname : this.tablesToBeDropped) { |
| Connection con = null; |
| try { |
| con = this.ch.getRWConnection(); |
| Statement stmt = null; |
| try { |
| stmt = con.createStatement(); |
| stmt.execute("drop table " + tname); |
| stmt.close(); |
| con.commit(); |
| dropped += tname + " "; |
| } catch (SQLException ex) { |
| LOG.debug("attempting to drop: " + tname, ex); |
| } finally { |
| closeStatement(stmt); |
| } |
| } catch (SQLException ex) { |
| LOG.debug("attempting to drop: " + tname, ex); |
| } finally { |
| this.ch.closeConnection(con); |
| } |
| } |
| this.droppedTables = dropped.trim(); |
| } |
| try { |
| this.ch.close(); |
| } catch (IOException ex) { |
| LOG.error("closing connection handler", ex); |
| } |
| try { |
| this.nodesCache.close(); |
| } catch (IOException ex) { |
| LOG.warn("Error occurred while closing nodes cache", ex); |
| } |
| LOG.info("RDBDocumentStore (" + getModuleVersion() + ") disposed" + getCnStats() |
| + (this.droppedTables.isEmpty() ? "" : " (tables dropped: " + this.droppedTables + ")")); |
| } |
| |
| @Override |
| public <T extends Document> T getIfCached(Collection<T> collection, String id) { |
| if (collection != Collection.NODES) { |
| return null; |
| } else { |
| NodeDocument doc = nodesCache.getIfPresent(id); |
| doc = (doc != null) ? unwrap(doc) : null; |
| return castAsT(doc); |
| } |
| } |
| |
| private <T extends Document> T getIfCached(Collection<T> collection, String id, long modCount) { |
| T doc = getIfCached(collection, id); |
| if (doc != null && doc.getModCount() != null && doc.getModCount() == modCount) { |
| return doc; |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public Iterable<CacheStats> getCacheStats() { |
| return nodesCache.getCacheStats(); |
| } |
| |
| @Override |
| public Map<String, String> getMetadata() { |
| return metadata; |
| } |
| |
| /** |
| * Statistics are generated for each table. The following fields are always |
| * added: |
| * <dl> |
| * <dt><em>tableName</em>.ns</dt> |
| * <dd>fully qualified name of the database table</dd> |
| * <dt><em>tableName</em>.schemaInfo</dt> |
| * <dd>DDL information for table, as obtained during startup</dd> |
| * <dt><em>tableName</em>.indexInfo</dt> |
| * <dd>DDL information for associated indexes, as obtained during |
| * startup</dd> |
| * <dt><em>tableName</em>.count</dt> |
| * <dd>exact number of rows</dd> |
| * </dl> |
| * In addition, some statistics information for |
| * {@link Collection#CLUSTER_NODES} is added: |
| * <dl> |
| * <dt>clusterNodes.updates</dt> |
| * <dd>Writes to the table, counted by cluster node ID</dd> |
| * </dl> |
| * Finally, additional database-specific statistics may be added; see |
| * descriptions in |
| * {@link RDBDocumentStoreDB#getAdditionalStatistics(RDBConnectionHandler, String, String)} |
| * for details. |
| **/ |
| @NotNull |
| @Override |
| public Map<String, String> getStats() { |
| ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); |
| tableMeta.forEach((k, v) -> toMapBuilder(builder, k, v)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("statistics obtained: " + builder.toString()); |
| } |
| return builder.build(); |
| } |
| |
| private <T extends Document> void toMapBuilder(ImmutableMap.Builder<String, String> builder, Collection<T> collection, RDBTableMetaData meta) { |
| String prefix = collection.toString(); |
| builder.put(prefix + ".ns", meta.getCatalog() + "." + meta.getName()); |
| builder.put(prefix + ".schemaInfo", meta.getSchemaInfo()); |
| builder.put(prefix + ".indexInfo", meta.getIndexInfo()); |
| if (Collection.CLUSTER_NODES.equals(collection)) { |
| builder.put(prefix + ".updates", getCnStats()); |
| } |
| // live data |
| Map<String, String> map = this.dbInfo.getAdditionalStatistics(this.ch, meta.getCatalog(), meta.getName()); |
| map.forEach((k, v) -> builder.put(prefix + "." + k, v)); |
| try { |
| long c = queryCount(collection, null, null, Collections.emptyList(), Collections.emptyList()); |
| builder.put(prefix + ".count", Long.toString(c)); |
| } |
| catch (DocumentStoreException ex) { |
| LOG.debug("getting entry count for " + prefix, ex); |
| } |
| } |
| |
| // implementation |
| |
| private static final String MODIFIED = "_modified"; |
| private static final String MODCOUNT = "_modCount"; |
| |
| /** |
| * Optional counter for changes to "_collisions" map ({@link NodeDocument#COLLISIONS}). |
| */ |
| public static final String COLLISIONSMODCOUNT = "_collisionsModCount"; |
| |
| private static final String ID = "_id"; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(RDBDocumentStore.class); |
| |
| private Exception callStack; |
| |
| private RDBConnectionHandler ch; |
| |
| // from options |
| private Set<String> tablesToBeDropped = new HashSet<String>(); |
| |
| // ratio between Java characters and UTF-8 encoding |
| // a) single characters will fit into 3 bytes |
| // b) a surrogate pair (two Java characters) will fit into 4 bytes |
| // thus... |
| public static final int CHAR2OCTETRATIO = 3; |
| |
| // number of retries for updates |
| private static final int RETRIES = 10; |
| |
| // see OAK-2044 |
| protected static final boolean USECMODCOUNT = true; |
| |
| // Database schema supported by this version |
| protected static final int SCHEMA = 2; |
| |
| private static final Key MODIFIEDKEY = new Key(MODIFIED, null); |
| |
| // DB-specific information |
| private RDBDocumentStoreDB dbInfo; |
| |
| // utility class for performing low-level operations |
| private RDBDocumentStoreJDBC db; |
| |
| protected static final List<String> EMPTY_KEY_PATTERN = Collections.emptyList(); |
| |
| private Map<String, String> metadata; |
| |
| private DocumentStoreStatsCollector stats; |
| |
| private boolean readOnly; |
| |
| // VERSION column mapping in queries used by RDBVersionGCSupport |
| public static String VERSIONPROP = "__version"; |
| |
| // set of supported indexed properties |
| private static final Set<String> INDEXEDPROPERTIES = new HashSet<String>(Arrays.asList(new String[] { MODIFIED, |
| NodeDocument.HAS_BINARY_FLAG, NodeDocument.DELETED_ONCE, NodeDocument.SD_TYPE, NodeDocument.SD_MAX_REV_TIME_IN_SECS, VERSIONPROP })); |
| |
| // set of required table columns |
| private static final Set<String> REQUIREDCOLUMNS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList( |
| new String[] { "id", "dsize", "deletedonce", "bdata", "data", "cmodcount", "modcount", "hasbinary", "modified" }))); |
| |
| // set of optional table columns |
| private static final Set<String> OPTIONALCOLUMNS = Collections |
| .unmodifiableSet(new HashSet<String>(Arrays.asList(new String[] { "version", "sdtype", "sdmaxrevtime" }))); |
| |
| // set of properties not serialized to JSON |
| private static final Set<String> COLUMNPROPERTIES = new HashSet<String>(Arrays.asList( |
| new String[] { ID, NodeDocument.HAS_BINARY_FLAG, NodeDocument.DELETED_ONCE, COLLISIONSMODCOUNT, MODIFIED, MODCOUNT })); |
| // set of properties not serialized to JSON, schema version 2 |
| private static final Set<String> COLUMNPROPERTIES2 = new HashSet<String>(Arrays.asList( |
| new String[] { ID, NodeDocument.HAS_BINARY_FLAG, NodeDocument.DELETED_ONCE, COLLISIONSMODCOUNT, MODIFIED, MODCOUNT, |
| NodeDocument.SD_TYPE, NodeDocument.SD_MAX_REV_TIME_IN_SECS, VERSIONPROP })); |
| |
| private final RDBDocumentSerializer ser = new RDBDocumentSerializer(this); |
| |
| private void initialize(DataSource ds, DocumentNodeStoreBuilder<?> builder, RDBOptions options) throws Exception { |
| this.stats = builder.getDocumentStoreStatsCollector(); |
| |
| this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null; |
| |
| this.readOnly = builder.getReadOnlyMode(); |
| |
| this.ch = new RDBConnectionHandler(ds); |
| Connection con = this.ch.getRWConnection(); |
| String catalog = con.getCatalog(); |
| DatabaseMetaData md = con.getMetaData(); |
| if (null == catalog) { |
| // Oracle |
| catalog = md.getUserName(); |
| } |
| |
| this.tableMeta.put(Collection.NODES, |
| new RDBTableMetaData(catalog, createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.NODES)))); |
| this.tableMeta.put(Collection.CLUSTER_NODES, |
| new RDBTableMetaData(catalog, createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.CLUSTER_NODES)))); |
| this.tableMeta.put(Collection.JOURNAL, |
| new RDBTableMetaData(catalog, createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.JOURNAL)))); |
| this.tableMeta.put(Collection.SETTINGS, |
| new RDBTableMetaData(catalog, createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.SETTINGS)))); |
| |
| |
| this.locks = new StripedNodeDocumentLocks(); |
| this.nodesCache = builder.buildNodeDocumentCache(this, locks); |
| |
| int isolation = con.getTransactionIsolation(); |
| String isolationDiags = RDBJDBCTools.isolationLevelToString(isolation); |
| if (isolation != Connection.TRANSACTION_READ_COMMITTED) { |
| LOG.info("Detected transaction isolation level " + isolationDiags + " is " |
| + (isolation < Connection.TRANSACTION_READ_COMMITTED ? "lower" : "higher") + " than expected " |
| + RDBJDBCTools.isolationLevelToString(Connection.TRANSACTION_READ_COMMITTED) |
| + " - check datasource configuration"); |
| } |
| |
| String dbDesc = String.format("%s %s (%d.%d)", md.getDatabaseProductName(), md.getDatabaseProductVersion(), |
| md.getDatabaseMajorVersion(), md.getDatabaseMinorVersion()).replaceAll("[\r\n\t]", " ").trim(); |
| String driverDesc = String.format("%s %s (%d.%d)", md.getDriverName(), md.getDriverVersion(), md.getDriverMajorVersion(), |
| md.getDriverMinorVersion()).replaceAll("[\r\n\t]", " ").trim(); |
| String dbUrl = md.getURL(); |
| |
| this.dbInfo = RDBDocumentStoreDB.getValue(md.getDatabaseProductName()); |
| this.db = new RDBDocumentStoreJDBC(this.dbInfo, this.ser, QUERYHITSLIMIT, QUERYTIMELIMIT); |
| this.metadata = ImmutableMap.<String,String>builder() |
| .put("type", "rdb") |
| .put("db", md.getDatabaseProductName()) |
| .put("version", md.getDatabaseProductVersion()) |
| .put("driver", md.getDriverName()) |
| .put("driverVersion", md.getDriverVersion()) |
| .build(); |
| String versionDiags = dbInfo.checkVersion(md); |
| if (!versionDiags.isEmpty()) { |
| LOG.error(versionDiags); |
| } |
| |
| if (! "".equals(dbInfo.getInitializationStatement())) { |
| Statement stmt = null; |
| try { |
| stmt = con.createStatement(); |
| stmt.execute(dbInfo.getInitializationStatement()); |
| stmt.close(); |
| con.commit(); |
| } |
| finally { |
| closeStatement(stmt); |
| } |
| } |
| |
| List<String> tablesCreated = new ArrayList<String>(); |
| List<String> tablesPresent = new ArrayList<String>(); |
| try { |
| createTableFor(con, Collection.CLUSTER_NODES, this.tableMeta.get(Collection.CLUSTER_NODES), tablesCreated, |
| tablesPresent, options.getInitialSchema(), options.getUpgradeToSchema()); |
| createTableFor(con, Collection.NODES, this.tableMeta.get(Collection.NODES), tablesCreated, tablesPresent, |
| options.getInitialSchema(), options.getUpgradeToSchema()); |
| createTableFor(con, Collection.SETTINGS, this.tableMeta.get(Collection.SETTINGS), tablesCreated, tablesPresent, |
| options.getInitialSchema(), options.getUpgradeToSchema()); |
| createTableFor(con, Collection.JOURNAL, this.tableMeta.get(Collection.JOURNAL), tablesCreated, tablesPresent, |
| options.getInitialSchema(), options.getUpgradeToSchema()); |
| } finally { |
| con.commit(); |
| con.close(); |
| } |
| |
| StringBuilder tableDiags = new StringBuilder(); |
| RDBTableMetaData nodesMeta = this.tableMeta.get(Collection.NODES); |
| tableDiags.append(nodesMeta.getSchemaInfo()); |
| if (!nodesMeta.getIndexInfo().isEmpty()) { |
| tableDiags.append(" /* ").append(nodesMeta.getIndexInfo()).append(" */"); |
| } |
| |
| if (options.isDropTablesOnClose()) { |
| tablesToBeDropped.addAll(tablesCreated); |
| } |
| |
| if (tableDiags.length() != 0) { |
| tableDiags.insert(0, ", "); |
| } |
| |
| Map<String, String> diag = dbInfo.getAdditionalDiagnostics(this.ch, this.tableMeta.get(Collection.NODES).getName()); |
| |
| LOG.info("RDBDocumentStore (" + getModuleVersion() + ") instantiated for database " + dbDesc + ", using driver: " |
| + driverDesc + ", connecting to: " + dbUrl + (diag.isEmpty() ? "" : (", properties: " + diag.toString())) |
| + ", transaction isolation level: " + isolationDiags + tableDiags); |
| if (!tablesPresent.isEmpty()) { |
| LOG.info("Tables present upon startup: " + tablesPresent); |
| } |
| if (!tablesCreated.isEmpty()) { |
| LOG.info("Tables created upon startup: " + tablesCreated |
| + (options.isDropTablesOnClose() ? " (will be dropped on exit)" : "")); |
| } |
| } |
| |
| private static boolean isBinaryType(int sqlType) { |
| return sqlType == Types.VARBINARY || sqlType == Types.BINARY || sqlType == Types.LONGVARBINARY; |
| } |
| |
| private static boolean isNChar(int sqlType) { |
| return sqlType == Types.NCHAR || sqlType == Types.NVARCHAR || sqlType == Types.LONGNVARCHAR; |
| } |
| |
| private static void obtainFlagsFromResultSetMeta(ResultSetMetaData met, RDBTableMetaData tmd) throws SQLException { |
| |
| for (int i = 1; i <= met.getColumnCount(); i++) { |
| String lcName = met.getColumnName(i).toLowerCase(Locale.ENGLISH); |
| if ("id".equals(lcName)) { |
| tmd.setIdIsBinary(isBinaryType(met.getColumnType(i))); |
| } |
| if ("data".equals(lcName)) { |
| tmd.setDataLimitInOctets(met.getPrecision(i)); |
| tmd.setDataIsNChar(isNChar(met.getColumnType(i))); |
| } |
| if ("version".equals(lcName)) { |
| tmd.setHasVersion(true); |
| } |
| if ("sdtype".equals(lcName)) { |
| tmd.setHasSplitDocs(true); |
| } |
| } |
| } |
| |
| private static String asQualifiedDbName(String one, String two) { |
| one = Strings.nullToEmpty(one).trim(); |
| two = Strings.nullToEmpty(two).trim(); |
| |
| if (one.isEmpty() && two.isEmpty()) { |
| return null; |
| } else { |
| one = Strings.nullToEmpty(one).trim(); |
| two = two == null ? "" : two.trim(); |
| return one.isEmpty() ? two : one + "." + two; |
| } |
| } |
| |
| private static String indexTypeAsString(int type) { |
| switch (type) { |
| case DatabaseMetaData.tableIndexClustered: |
| return "clustered"; |
| case DatabaseMetaData.tableIndexHashed: |
| return "hashed"; |
| case DatabaseMetaData.tableIndexStatistic: |
| return "statistic"; |
| case DatabaseMetaData.tableIndexOther: |
| return "other"; |
| default: |
| return "indexType=" + type; |
| } |
| } |
| |
| private static String dumpIndexData(DatabaseMetaData met, ResultSetMetaData rmet, String tableName, Set<String> indexedColumns) { |
| |
| ResultSet rs = null; |
| try { |
| // if the result set metadata provides a table name, use that (the |
| // other one might be inaccurate due to case insensitivity issues) |
| String rmetTableName = Strings.nullToEmpty(rmet.getTableName(1)).trim(); |
| if (!rmetTableName.isEmpty()) { |
| tableName = rmetTableName; |
| } |
| |
| String rmetSchemaName = Strings.nullToEmpty(rmet.getSchemaName(1)).trim(); |
| |
| rs = met.getIndexInfo(null, null, tableName, false, true); |
| |
| Map<String, IndexInformation> indices = getIndexInformation(rs, rmetSchemaName); |
| if (indices.isEmpty() && !tableName.equals(tableName.toUpperCase(Locale.ENGLISH))) { |
| // might have failed due to the DB's handling on ucase/lcase, |
| // retry ucase |
| rs = met.getIndexInfo(null, null, tableName.toUpperCase(Locale.ENGLISH), false, true); |
| indices = getIndexInformation(rs, rmetSchemaName); |
| } |
| if (indexedColumns != null) { |
| for (IndexInformation idata : indices.values()) { |
| indexedColumns.addAll(idata.columns); |
| } |
| } |
| return dumpIndexData(indices); |
| } catch (SQLException ex) { |
| // well it was best-effort |
| String message = String.format("exception while retrieving index information: %s, code %d, state %s", ex.getMessage(), |
| ex.getErrorCode(), ex.getSQLState()); |
| LOG.debug(message, ex); |
| return "/* " + message + "*/"; |
| } finally { |
| closeResultSet(rs); |
| } |
| } |
| |
| private static String dumpIndexData(Map<String, IndexInformation> indices) { |
| StringBuilder sb = new StringBuilder(); |
| for (Entry<String, IndexInformation> index : indices.entrySet()) { |
| String indexName = index.getKey(); |
| IndexInformation info = index.getValue(); |
| if (!info.fields.isEmpty()) { |
| if (sb.length() != 0) { |
| sb.append(", "); |
| } |
| sb.append(String.format("%sindex %s on %s (", info.nonunique ? "" : "unique ", indexName, info.tname)); |
| String delim = ""; |
| for (String field : info.fields.values()) { |
| sb.append(delim); |
| delim = ", "; |
| sb.append(field); |
| } |
| sb.append(")"); |
| sb.append(" ").append(info.type); |
| } |
| if (info.filterCondition != null) { |
| sb.append(" where ").append(info.filterCondition); |
| } |
| sb.append(String.format(" (#%d, p%d)", info.cardinality, info.pages)); |
| } |
| return sb.toString(); |
| } |
| |
| // see https://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html#getIndexInfo(java.lang.String,%20java.lang.String,%20java.lang.String,%20boolean,%20boolean) |
| private static Map<String, IndexInformation> getIndexInformation(ResultSet rs, String rmetSchemaName) throws SQLException { |
| Map<String, IndexInformation> result = new TreeMap<>(); |
| while (rs.next()) { |
| String name = asQualifiedDbName(rs.getString("INDEX_QUALIFIER"), rs.getString("INDEX_NAME")); |
| if (name != null) { |
| IndexInformation info = result.get(name); |
| if (info == null) { |
| info = new IndexInformation(); |
| result.put(name, info); |
| info.fields = new TreeMap<>(); |
| } |
| info.nonunique = rs.getBoolean("NON_UNIQUE"); |
| info.type = indexTypeAsString(rs.getInt("TYPE")); |
| String inSchema = rs.getString("TABLE_SCHEM"); |
| inSchema = Strings.nullToEmpty(inSchema).trim(); |
| String filterCondition = Strings.nullToEmpty(rs.getString("FILTER_CONDITION")).trim(); |
| if (!filterCondition.isEmpty()) { |
| info.filterCondition = filterCondition; |
| } |
| info.cardinality = rs.getInt("CARDINALITY"); |
| info.pages = rs.getInt("PAGES"); |
| Set<String> columns = new HashSet<>(); |
| info.columns = columns; |
| // skip indices on tables in other schemas in case we have that information |
| if (rmetSchemaName.isEmpty() || inSchema.isEmpty() || rmetSchemaName.equals(inSchema)) { |
| String tname = asQualifiedDbName(inSchema, rs.getString("TABLE_NAME")); |
| info.tname = tname; |
| String cname = rs.getString("COLUMN_NAME"); |
| if (cname != null) { |
| columns.add(cname.toUpperCase(Locale.ENGLISH)); |
| String order = "A".equals(rs.getString("ASC_OR_DESC")) ? " ASC" : ("D".equals(rs.getString("ASC_OR_DESC")) ? " DESC" : ""); |
| info.fields.put(rs.getInt("ORDINAL_POSITION"), cname + order); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| private static class IndexInformation { |
| public Map<Integer, String> fields; |
| public boolean nonunique; |
| public String type; |
| public String filterCondition; |
| public int cardinality; |
| public int pages; |
| public Set<String> columns; |
| public String tname; |
| } |
| |
| private void createTableFor(Connection con, Collection<? extends Document> col, RDBTableMetaData tmd, List<String> tablesCreated, |
| List<String> tablesPresent, int initialSchema, int upgradeToSchema) throws SQLException { |
| |
| String dbname = this.dbInfo.toString(); |
| if (con.getMetaData().getURL() != null) { |
| dbname += " (" + con.getMetaData().getURL() + ")"; |
| } |
| String tableName = tmd.getName(); |
| |
| Statement checkStatement = null; |
| |
| ResultSet checkResultSet = null; |
| Statement creatStatement = null; |
| |
| try { |
| // avoid PreparedStatement due to weird DB2 behavior (OAK-6237) |
| checkStatement = con.createStatement(); |
| checkResultSet = checkStatement.executeQuery("select * from " + tableName + " where ID = '0'"); |
| |
| // try to discover size of DATA column and binary-ness of ID |
| ResultSetMetaData met = checkResultSet.getMetaData(); |
| obtainFlagsFromResultSetMeta(met, tmd); |
| |
| // check that all required columns are present |
| Set<String> requiredColumns = new HashSet<String>(REQUIREDCOLUMNS); |
| Set<String> unknownColumns = new HashSet<String>(); |
| boolean hasVersionColumn = false; |
| boolean hasSDTypeColumn = false; |
| for (int i = 1; i <= met.getColumnCount(); i++) { |
| String cname = met.getColumnName(i).toLowerCase(Locale.ENGLISH); |
| if (!requiredColumns.remove(cname)) { |
| if (!OPTIONALCOLUMNS.contains(cname)) { |
| unknownColumns.add(cname); |
| } |
| } |
| if (cname.equals("version")) { |
| hasVersionColumn = true; |
| } |
| if (cname.equals("sdtype")) { |
| hasSDTypeColumn = true; |
| } |
| } |
| |
| if (!requiredColumns.isEmpty()) { |
| String message = String.format("Table %s: the following required columns are missing: %s", tableName, |
| requiredColumns.toString()); |
| LOG.error(message); |
| throw new DocumentStoreException(message); |
| } |
| |
| if (!unknownColumns.isEmpty()) { |
| String message = String.format("Table %s: the following columns are unknown and will not be maintained: %s", |
| tableName, unknownColumns.toString()); |
| LOG.info(message); |
| } |
| |
| String tableInfo = RDBJDBCTools.dumpResultSetMeta(met); |
| tmd.setSchemaInfo(tableInfo); |
| Set<String> indexOn = new HashSet<String>(); |
| String indexInfo = dumpIndexData(con.getMetaData(), met, tableName, indexOn); |
| tmd.setIndexInfo(indexInfo); |
| |
| closeResultSet(checkResultSet); |
| boolean dbWasChanged = false; |
| |
| if (this.readOnly) { |
| LOG.debug("Skipping table update code because store is initialized in readOnly mode"); |
| } |
| else { |
| if (!hasVersionColumn && upgradeToSchema >= 1) { |
| dbWasChanged |= upgradeTable(con, tableName, 1); |
| } |
| |
| if (!hasSDTypeColumn && upgradeToSchema >= 2) { |
| dbWasChanged |= upgradeTable(con, tableName, 2); |
| } |
| |
| if (!indexOn.contains("MODIFIED") && col == Collection.NODES) { |
| dbWasChanged |= addModifiedIndex(con, tableName); |
| } |
| } |
| |
| tablesPresent.add(tableName); |
| |
| if (dbWasChanged) { |
| getTableMetaData(con, col, tmd); |
| } |
| } catch (SQLException ex) { |
| // table does not appear to exist |
| con.rollback(); |
| |
| LOG.debug("trying to read from '" + tableName + "'", ex); |
| if (this.readOnly) { |
| throw new SQLException("Would like to create table '" + tableName |
| + "', but RDBDocumentStore has been initialized in 'readonly' mode"); |
| } |
| |
| try { |
| creatStatement = con.createStatement(); |
| creatStatement.execute(this.dbInfo.getTableCreationStatement(tableName, initialSchema)); |
| creatStatement.close(); |
| |
| for (String ic : this.dbInfo.getIndexCreationStatements(tableName, initialSchema)) { |
| creatStatement = con.createStatement(); |
| creatStatement.execute(ic); |
| creatStatement.close(); |
| } |
| |
| con.commit(); |
| |
| if (initialSchema < 1 && upgradeToSchema >= 1) { |
| upgradeTable(con, tableName, 1); |
| } |
| |
| if (initialSchema < 2 && upgradeToSchema >= 2) { |
| upgradeTable(con, tableName, 2); |
| } |
| |
| tablesCreated.add(tableName); |
| |
| getTableMetaData(con, col, tmd); |
| } |
| catch (SQLException ex2) { |
| LOG.error("Failed to create table '" + tableName + "' in '" + dbname + "'", ex2); |
| throw ex2; |
| } |
| } |
| finally { |
| closeResultSet(checkResultSet); |
| closeStatement(checkStatement); |
| closeStatement(creatStatement); |
| } |
| } |
| |
| private boolean upgradeTable(Connection con, String tableName, int level) throws SQLException { |
| boolean wasChanged = false; |
| |
| for (String statement : this.dbInfo.getTableUpgradeStatements(tableName, level)) { |
| Statement upgradeStatement = null; |
| try { |
| upgradeStatement = con.createStatement(); |
| upgradeStatement.execute(statement); |
| upgradeStatement.close(); |
| con.commit(); |
| LOG.info("Upgraded " + tableName + " to DB level " + level + " using '" + statement + "'"); |
| wasChanged = true; |
| } catch (SQLException exup) { |
| con.rollback(); |
| String message = String.format( |
| "Attempted to upgrade %s to DB level %d using '%s', but failed with SQLException '%s' (code: %d/state: %s) - will continue without.", |
| tableName, level, statement, exup.getMessage(), exup.getErrorCode(), exup.getSQLState()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(message, exup); |
| } else { |
| LOG.info(message); |
| } |
| } finally { |
| closeStatement(upgradeStatement); |
| } |
| } |
| |
| return wasChanged; |
| } |
| |
| private boolean addModifiedIndex(Connection con, String tableName) throws SQLException { |
| boolean wasChanged = false; |
| |
| String statement = this.dbInfo.getModifiedIndexStatement(tableName); |
| Statement upgradeStatement = null; |
| try { |
| upgradeStatement = con.createStatement(); |
| upgradeStatement.execute(statement); |
| upgradeStatement.close(); |
| con.commit(); |
| LOG.info("Added 'modified' index to " + tableName + " using '" + statement + "'"); |
| wasChanged = true; |
| } catch (SQLException exup) { |
| con.rollback(); |
| String message = String.format( |
| "Attempted to add 'modified' index to %s using '%s', but failed with SQLException '%s' (code: %d/state: %s) - will continue without.", |
| tableName, statement, exup.getMessage(), exup.getErrorCode(), exup.getSQLState()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(message, exup); |
| } else { |
| LOG.info(message); |
| } |
| } finally { |
| closeStatement(upgradeStatement); |
| } |
| |
| return wasChanged; |
| } |
| |
| private static void getTableMetaData(Connection con, Collection<? extends Document> col, RDBTableMetaData tmd) throws SQLException { |
| Statement checkStatement = null; |
| ResultSet checkResultSet = null; |
| |
| try { |
| checkStatement = con.createStatement(); |
| checkResultSet = checkStatement.executeQuery("select * from " + tmd.getName() + " where ID = '0'"); |
| |
| // try to discover size of DATA column and binary-ness of ID |
| ResultSetMetaData met = checkResultSet.getMetaData(); |
| obtainFlagsFromResultSetMeta(met, tmd); |
| |
| String tableInfo = RDBJDBCTools.dumpResultSetMeta(met); |
| tmd.setSchemaInfo(tableInfo); |
| String indexInfo = dumpIndexData(con.getMetaData(), met, tmd.getName(), null); |
| tmd.setIndexInfo(indexInfo); |
| } finally { |
| closeResultSet(checkResultSet); |
| closeStatement(checkStatement); |
| } |
| } |
| |
| public boolean isReadOnly() { |
| return readOnly; |
| } |
| |
| @Override |
| protected void finalize() throws Throwable { |
| if (!this.ch.isClosed() && this.callStack != null) { |
| LOG.debug("finalizing RDBDocumentStore that was not disposed", this.callStack); |
| } |
| super.finalize(); |
| } |
| |
| private <T extends Document> T readDocumentCached(final Collection<T> collection, final String id, int maxCacheAge) { |
| if (collection != Collection.NODES) { |
| return readDocumentUncached(collection, id, null); |
| } else { |
| NodeDocument doc = null; |
| if (maxCacheAge > 0) { |
| // first try without lock |
| doc = nodesCache.getIfPresent(id); |
| if (doc != null) { |
| long lastCheckTime = doc.getLastCheckTime(); |
| if (lastCheckTime != 0) { |
| if (maxCacheAge == Integer.MAX_VALUE || System.currentTimeMillis() - lastCheckTime < maxCacheAge) { |
| stats.doneFindCached(Collection.NODES, id); |
| return castAsT(unwrap(doc)); |
| } |
| } |
| } |
| } |
| try { |
| try (CacheLock lock = acquireLockFor(id)) { |
| // caller really wants the cache to be cleared |
| if (maxCacheAge == 0) { |
| invalidateNodesCache(id, true); |
| doc = null; |
| } |
| final NodeDocument cachedDoc = doc; |
| doc = nodesCache.get(id, new Callable<NodeDocument>() { |
| @Override |
| public NodeDocument call() throws Exception { |
| NodeDocument doc = (NodeDocument) readDocumentUncached(collection, id, cachedDoc); |
| if (doc != null) { |
| doc.seal(); |
| } |
| return wrap(doc); |
| } |
| }); |
| // inspect the doc whether it can be used |
| long lastCheckTime = doc.getLastCheckTime(); |
| if (lastCheckTime != 0 && (maxCacheAge == 0 || maxCacheAge == Integer.MAX_VALUE)) { |
| // we either just cleared the cache or the caller does |
| // not care; |
| } else if (lastCheckTime != 0 && (System.currentTimeMillis() - lastCheckTime < maxCacheAge)) { |
| // is new enough |
| } else { |
| // need to at least revalidate |
| NodeDocument ndoc = (NodeDocument) readDocumentUncached(collection, id, cachedDoc); |
| if (ndoc != null) { |
| ndoc.seal(); |
| } |
| doc = wrap(ndoc); |
| nodesCache.put(doc); |
| } |
| } |
| return castAsT(unwrap(doc)); |
| } catch (ExecutionException e) { |
| throw new IllegalStateException("Failed to load document with " + id, e); |
| } |
| } |
| } |
| |
| @Nullable |
| private <T extends Document> boolean internalCreate(Collection<T> collection, List<UpdateOp> updates) { |
| final Stopwatch watch = startWatch(); |
| List<String> ids = new ArrayList<String>(updates.size()); |
| boolean success = true; |
| try { |
| |
| // try up to CHUNKSIZE ops in one transaction |
| for (List<UpdateOp> chunks : Lists.partition(updates, CHUNKSIZE)) { |
| List<T> docs = new ArrayList<T>(); |
| for (UpdateOp update : chunks) { |
| ids.add(update.getId()); |
| maintainUpdateStats(collection, update.getId()); |
| UpdateUtils.assertUnconditional(update); |
| T doc = collection.newDocument(this); |
| addUpdateCounters(update); |
| UpdateUtils.applyChanges(doc, update); |
| if (!update.getId().equals(doc.getId())) { |
| throw new DocumentStoreException("ID mismatch - UpdateOp: " + update.getId() + ", ID property: " |
| + doc.getId()); |
| } |
| docs.add(doc); |
| } |
| boolean done = insertDocuments(collection, docs); |
| if (done) { |
| if (collection == Collection.NODES) { |
| for (T doc : docs) { |
| nodesCache.putIfAbsent((NodeDocument) doc); |
| } |
| } |
| } else { |
| success = false; |
| } |
| } |
| return success; |
| } catch (DocumentStoreException ex) { |
| return false; |
| } finally { |
| stats.doneCreate(watch.elapsed(TimeUnit.NANOSECONDS), collection, ids, success); |
| } |
| } |
| |
| @Nullable |
| private <T extends Document> T internalCreateOrUpdate(Collection<T> collection, UpdateOp update, boolean allowCreate, |
| boolean checkConditions, int retries) { |
| T oldDoc = readDocumentCached(collection, update.getId(), Integer.MAX_VALUE); |
| |
| if (oldDoc == null) { |
| if (!allowCreate || !update.isNew()) { |
| return null; |
| } |
| T doc = collection.newDocument(this); |
| if (checkConditions && !checkConditions(doc, update.getConditions())) { |
| return null; |
| } |
| addUpdateCounters(update); |
| UpdateUtils.applyChanges(doc, update); |
| try { |
| Stopwatch watch = startWatch(); |
| if (!insertDocuments(collection, Collections.singletonList(doc))) { |
| throw new DocumentStoreException("Can't insert the document: " + doc.getId()); |
| } |
| if (collection == Collection.NODES) { |
| nodesCache.putIfAbsent((NodeDocument) doc); |
| } |
| stats.doneFindAndModify(watch.elapsed(TimeUnit.NANOSECONDS), collection, update.getId(), true, true, 0); |
| return oldDoc; |
| } catch (DocumentStoreException ex) { |
| // may have failed due to a race condition; try update instead |
| // this is an edge case, so it's ok to bypass the cache |
| // (avoiding a race condition where the DB is already updated |
| // but the cache is not) |
| oldDoc = readDocumentUncached(collection, update.getId(), null); |
| if (oldDoc == null) { |
| // something else went wrong |
| LOG.error("insert failed, but document " + update.getId() + " is not present, aborting", ex); |
| throw (ex); |
| } |
| return internalUpdate(collection, update, oldDoc, checkConditions, retries); |
| } |
| } else { |
| T result = internalUpdate(collection, update, oldDoc, checkConditions, retries); |
| if (allowCreate && result == null) { |
| if (retries > 0) { |
| result = internalCreateOrUpdate(collection, update, allowCreate, checkConditions, retries - 1); |
| } |
| else { |
| LOG.error("update of " + update.getId() + " failed, race condition?"); |
| throw new DocumentStoreException("update of " + update.getId() + " failed, race condition?", null, |
| DocumentStoreException.Type.TRANSIENT); |
| } |
| } |
| return result; |
| } |
| } |
| |
| /** |
| * @return previous version of document or <code>null</code> |
| */ |
| @Nullable |
| private <T extends Document> T internalUpdate(Collection<T> collection, UpdateOp update, T oldDoc, boolean checkConditions, |
| int maxRetries) { |
| if (checkConditions && !UpdateUtils.checkConditions(oldDoc, update.getConditions())) { |
| return null; |
| } else { |
| maintainUpdateStats(collection, update.getId()); |
| addUpdateCounters(update); |
| T doc = createNewDocument(collection, oldDoc, update); |
| final Stopwatch watch = startWatch(); |
| boolean success = false; |
| int retries = maxRetries; |
| try (CacheLock lock = acquireLockFor(update.getId())) { |
| while (!success && retries > 0) { |
| long lastmodcount = modcountOf(oldDoc); |
| success = updateDocument(collection, doc, update, lastmodcount); |
| if (!success) { |
| retries -= 1; |
| oldDoc = readDocumentCached(collection, update.getId(), Integer.MAX_VALUE); |
| if (oldDoc != null) { |
| long newmodcount = modcountOf(oldDoc); |
| if (lastmodcount == newmodcount) { |
| // cached copy did not change so it probably was |
| // updated by a different instance, get a fresh one |
| LOG.debug("suspect update from different instance (current modcount: {}), refetching: {}...", |
| newmodcount, update.getId()); |
| if (collection == Collection.NODES) { |
| nodesCache.invalidate(update.getId()); |
| } |
| oldDoc = readDocumentUncached(collection, update.getId(), null); |
| if (oldDoc == null) { |
| LOG.debug("after refetch: {} is gone", update.getId()); |
| } else { |
| LOG.debug("after refetch: modcount for {} is {}", update.getId(), modcountOf(oldDoc)); |
| } |
| } |
| } |
| |
| if (oldDoc == null) { |
| // document was there but is now gone |
| LOG.debug("failed to apply update because document is gone in the meantime: " + update.getId(), new Exception("call stack")); |
| return null; |
| } |
| |
| if (checkConditions && !UpdateUtils.checkConditions(oldDoc, update.getConditions())) { |
| return null; |
| } |
| else { |
| addUpdateCounters(update); |
| doc = createNewDocument(collection, oldDoc, update); |
| } |
| } else { |
| if (collection == Collection.NODES) { |
| nodesCache.replaceCachedDocument((NodeDocument) oldDoc, (NodeDocument) doc); |
| } |
| } |
| } |
| |
| if (!success) { |
| throw new DocumentStoreException("failed update of " + doc.getId() + " (race?) after " + maxRetries |
| + " retries", null, DocumentStoreException.Type.TRANSIENT); |
| } |
| |
| return oldDoc; |
| } finally { |
| int numOfAttempts = maxRetries - retries - 1; |
| stats.doneFindAndModify(watch.elapsed(TimeUnit.NANOSECONDS), collection, |
| update.getId(), false, success, numOfAttempts); |
| } |
| } |
| } |
| |
| @NotNull |
| private <T extends Document> T createNewDocument(Collection<T> collection, T oldDoc, UpdateOp update) { |
| T doc = collection.newDocument(this); |
| oldDoc.deepCopy(doc); |
| UpdateUtils.applyChanges(doc, update); |
| doc.seal(); |
| return doc; |
| } |
| |
| private static void addUpdateCounters(UpdateOp update) { |
| if (hasChangesToCollisions(update)) { |
| update.increment(COLLISIONSMODCOUNT, 1); |
| } |
| update.increment(MODCOUNT, 1); |
| } |
| |
| private <T extends Document> List<T> internalQuery(Collection<T> collection, String fromKey, String toKey, |
| List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit) { |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| for (QueryCondition cond : conditions) { |
| if (!INDEXEDPROPERTIES.contains(cond.getPropertyName())) { |
| String message = "indexed property " + cond.getPropertyName() + " not supported, query was '" + cond |
| + "'; supported properties are " + INDEXEDPROPERTIES; |
| LOG.info(message); |
| throw new DocumentStoreException(message); |
| } |
| } |
| |
| final Stopwatch watch = startWatch(); |
| int resultSize = 0; |
| |
| try (CacheChangesTracker tracker = obtainTracker(collection, fromKey, toKey)) { |
| long now = System.currentTimeMillis(); |
| connection = this.ch.getROConnection(); |
| String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey; |
| String to = collection == Collection.NODES && NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey; |
| |
| // OAK-6839: only populate the cache with *new* entries if the query |
| // isn't open-ended (something done by GC processes) |
| boolean populateCache = to != null; |
| |
| List<RDBRow> dbresult = db.query(connection, tmd, from, to, excludeKeyPatterns, conditions, limit); |
| connection.commit(); |
| |
| int size = dbresult.size(); |
| List<T> result = new ArrayList<T>(size); |
| for (int i = 0; i < size; i++) { |
| // free RDBRow as early as possible |
| RDBRow row = dbresult.set(i, null); |
| T doc = getIfCached(collection, row.getId(), row.getModcount()); |
| if (doc == null) { |
| // parse DB contents into document if and only if it's not |
| // already in the cache |
| doc = convertFromDBObject(collection, row); |
| } else { |
| // we got a document from the cache, thus collection is NODES |
| // and a tracker is present |
| long lastmodified = modifiedOf(doc); |
| if (lastmodified == row.getModified() && lastmodified >= 1) { |
| try (CacheLock lock = acquireLockFor(row.getId())) { |
| if (!tracker.mightBeenAffected(row.getId())) { |
| // otherwise mark it as fresh |
| ((NodeDocument) doc).markUpToDate(now); |
| } |
| } |
| } |
| else { |
| // we need a fresh document instance |
| doc = convertFromDBObject(collection, row); |
| } |
| } |
| result.add(doc); |
| } |
| if (collection == Collection.NODES) { |
| if (populateCache) { |
| nodesCache.putNonConflictingDocs(tracker, castAsNodeDocumentList(result)); |
| } else { |
| Map<String, ModificationStamp> invMap = Maps.newHashMap(); |
| for (Document doc : result) { |
| invMap.put(doc.getId(), new ModificationStamp(modcountOf(doc), modifiedOf(doc))); |
| } |
| nodesCache.invalidateOutdated(invMap); |
| } |
| } |
| resultSize = result.size(); |
| return result; |
| } catch (Exception ex) { |
| LOG.error("SQL exception on query", ex); |
| throw asDocumentStoreException(ex, "SQL exception on query"); |
| } finally { |
| this.ch.closeConnection(connection); |
| stats.doneQuery(watch.elapsed(TimeUnit.NANOSECONDS), collection, fromKey, toKey, |
| !conditions.isEmpty(), resultSize, -1, false); |
| } |
| } |
| |
| private static interface MyCloseableIterable<T> extends Closeable, Iterable<T> { |
| } |
| |
| protected <T extends Document> Iterable<T> queryAsIterable(final Collection<T> collection, String fromKey, String toKey, |
| final List<String> excludeKeyPatterns, final List<QueryCondition> conditions, final int limit, final String sortBy) { |
| |
| final RDBTableMetaData tmd = getTable(collection); |
| Set<String> allowedProps = Sets.intersection(INDEXEDPROPERTIES, tmd.getColumnProperties()); |
| for (QueryCondition cond : conditions) { |
| if (!allowedProps.contains(cond.getPropertyName())) { |
| String message = "indexed property " + cond.getPropertyName() + " not supported, query was '" + cond |
| + "'; supported properties are " + allowedProps; |
| LOG.info(message); |
| throw new UnsupportedIndexedPropertyException(message); |
| } |
| } |
| |
| final String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey; |
| final String to = collection == Collection.NODES && NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey; |
| |
| return new MyCloseableIterable<T>() { |
| |
| Set<Iterator<RDBRow>> returned = Sets.newHashSet(); |
| |
| @Override |
| public Iterator<T> iterator() { |
| try { |
| Iterator<RDBRow> res = db.queryAsIterator(ch, tmd, from, to, excludeKeyPatterns, conditions, |
| limit, sortBy); |
| returned.add(res); |
| Iterator<T> tmp = Iterators.transform(res, new Function<RDBRow, T>() { |
| @Override |
| public T apply(RDBRow input) { |
| return convertFromDBObject(collection, input); |
| } |
| }); |
| return CloseableIterator.wrap(tmp, (Closeable) res); |
| } catch (SQLException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| for (Iterator<RDBRow> rdbi : returned) { |
| if (rdbi instanceof Closeable) { |
| ((Closeable) rdbi).close(); |
| } |
| } |
| } |
| }; |
| } |
| |
| protected <T extends Document> long queryCount(final Collection<T> collection, String fromKey, String toKey, |
| final List<String> excludeKeyPatterns, final List<QueryCondition> conditions) { |
| |
| return internalGetAggregate(collection, "COUNT", "*", fromKey, toKey, excludeKeyPatterns, conditions); |
| } |
| |
| protected <T extends Document> long getMinValue(final Collection<T> collection, String field, String fromKey, String toKey, |
| final List<String> excludeKeyPatterns, final List<QueryCondition> conditions) { |
| |
| return internalGetAggregate(collection, "MIN", field, fromKey, toKey, excludeKeyPatterns, conditions); |
| } |
| |
| private <T extends Document> long internalGetAggregate(final Collection<T> collection, final String aggregrate, String field, |
| String fromKey, String toKey, final List<String> excludeKeyPatterns, final List<QueryCondition> conditions) { |
| |
| final RDBTableMetaData tmd = getTable(collection); |
| for (QueryCondition cond : conditions) { |
| if (!INDEXEDPROPERTIES.contains(cond.getPropertyName())) { |
| String message = "indexed property " + cond.getPropertyName() + " not supported, query was '" + cond |
| + "'; supported properties are " + INDEXEDPROPERTIES; |
| LOG.info(message); |
| throw new DocumentStoreException(message); |
| } |
| } |
| |
| final String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey; |
| final String to = collection == Collection.NODES && NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey; |
| |
| Connection connection = null; |
| try { |
| connection = ch.getROConnection(); |
| long result = db.getLong(connection, tmd, aggregrate, field, from, to, excludeKeyPatterns, conditions); |
| connection.commit(); |
| return result; |
| } catch (SQLException ex) { |
| LOG.error("SQL exception on query", ex); |
| throw asDocumentStoreException(ex, "SQL exception on query"); |
| } finally { |
| this.ch.closeConnection(connection); |
| } |
| } |
| |
| @NotNull |
| protected <T extends Document> RDBTableMetaData getTable(Collection<T> collection) { |
| RDBTableMetaData tmd = this.tableMeta.get(collection); |
| if (tmd != null) { |
| return tmd; |
| } else { |
| throw new IllegalArgumentException("Unknown collection: " + collection.toString()); |
| } |
| } |
| |
| @Nullable |
| private <T extends Document> T readDocumentUncached(Collection<T> collection, String id, NodeDocument cachedDoc) { |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| final Stopwatch watch = startWatch(); |
| boolean docFound = true; |
| try { |
| long lastmodcount = -1, lastmodified = -1; |
| if (cachedDoc != null) { |
| lastmodcount = modcountOf(cachedDoc); |
| lastmodified = modifiedOf(cachedDoc); |
| } |
| connection = this.ch.getROConnection(); |
| RDBRow row = db.read(connection, tmd, id, lastmodcount, lastmodified); |
| connection.commit(); |
| if (row == null) { |
| docFound = false; |
| return null; |
| } else { |
| if (lastmodcount == row.getModcount() && lastmodified == row.getModified() && lastmodified >= 1) { |
| // we can re-use the cached document |
| cachedDoc.markUpToDate(System.currentTimeMillis()); |
| return castAsT(cachedDoc); |
| } else { |
| return convertFromDBObject(collection, row); |
| } |
| } |
| } catch (Exception ex) { |
| throw asDocumentStoreException(ex, "exception while reading " + id); |
| } finally { |
| this.ch.closeConnection(connection); |
| stats.doneFindUncached(watch.elapsed(TimeUnit.NANOSECONDS), collection, id, docFound, false); |
| } |
| } |
| |
| private <T extends Document> void delete(Collection<T> collection, String id) { |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| Stopwatch watch = startWatch(); |
| try { |
| connection = this.ch.getRWConnection(); |
| db.delete(connection, tmd, Collections.singletonList(id)); |
| connection.commit(); |
| } catch (Exception ex) { |
| this.ch.rollbackConnection(connection); |
| throw handleException("removing " + id, ex, collection, id); |
| } finally { |
| this.ch.closeConnection(connection); |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, 1); |
| } |
| } |
| |
| private <T extends Document> int delete(Collection<T> collection, List<String> ids) { |
| int numDeleted = 0; |
| RDBTableMetaData tmd = getTable(collection); |
| for (List<String> sublist : Lists.partition(ids, 64)) { |
| Connection connection = null; |
| Stopwatch watch = startWatch(); |
| try { |
| connection = this.ch.getRWConnection(); |
| numDeleted += db.delete(connection, tmd, sublist); |
| connection.commit(); |
| } catch (Exception ex) { |
| this.ch.rollbackConnection(connection); |
| throw handleException("removing " + ids, ex, collection, ids); |
| } finally { |
| this.ch.closeConnection(connection); |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, ids.size()); |
| } |
| } |
| return numDeleted; |
| } |
| |
| private <T extends Document> int delete(Collection<T> collection, Map<String, Long> toRemove) { |
| int numDeleted = 0; |
| RDBTableMetaData tmd = getTable(collection); |
| Map<String, Long> subMap = Maps.newHashMap(); |
| Iterator<Entry<String, Long>> it = toRemove.entrySet().iterator(); |
| while (it.hasNext()) { |
| Entry<String, Long> entry = it.next(); |
| subMap.put(entry.getKey(), entry.getValue()); |
| if (subMap.size() == 64 || !it.hasNext()) { |
| Connection connection = null; |
| int num = 0; |
| Stopwatch watch = startWatch(); |
| try { |
| connection = this.ch.getRWConnection(); |
| num = db.delete(connection, tmd, subMap); |
| numDeleted += num; |
| connection.commit(); |
| } catch (Exception ex) { |
| this.ch.rollbackConnection(connection); |
| Set<String> ids = subMap.keySet(); |
| throw handleException("deleting " + ids, ex, collection, ids); |
| } finally { |
| this.ch.closeConnection(connection); |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, num); |
| } |
| subMap.clear(); |
| } |
| } |
| return numDeleted; |
| } |
| |
| private <T extends Document> int deleteWithCondition(Collection<T> collection, List<QueryCondition> conditions) { |
| int numDeleted = 0; |
| RDBTableMetaData tmd = getTable(collection); |
| Stopwatch watch = startWatch(); |
| Connection connection = null; |
| try { |
| connection = this.ch.getRWConnection(); |
| numDeleted = db.deleteWithCondition(connection, tmd, conditions); |
| connection.commit(); |
| } catch (Exception ex) { |
| this.ch.rollbackConnection(connection); |
| throw asDocumentStoreException(ex, "deleting " + collection + ": " + conditions); |
| } finally { |
| this.ch.closeConnection(connection); |
| stats.doneRemove(watch.elapsed(TimeUnit.NANOSECONDS), collection, numDeleted); |
| } |
| return numDeleted; |
| } |
| |
| private <T extends Document> boolean updateDocument(@NotNull Collection<T> collection, @NotNull T document, |
| @NotNull UpdateOp update, Long oldmodcount) { |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| String data = null; |
| try { |
| connection = this.ch.getRWConnection(); |
| Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG); |
| Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE); |
| Long modcount = (Long) document.get(MODCOUNT); |
| Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT); |
| boolean success = false; |
| boolean shouldRetry = true; |
| |
| // every 16th update is a full rewrite |
| if (isAppendableUpdate(update) && modcount % 16 != 0) { |
| String appendData = ser.asString(update, tmd.getColumnOnlyProperties()); |
| if (appendData.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) { |
| try { |
| Operation modOperation = update.getChanges().get(MODIFIEDKEY); |
| long modified = getModifiedFromOperation(modOperation); |
| boolean modifiedIsConditional = modOperation == null || modOperation.type != UpdateOp.Operation.Type.SET; |
| success = db.appendingUpdate(connection, tmd, document.getId(), modified, modifiedIsConditional, hasBinary, |
| deletedOnce, modcount, cmodcount, oldmodcount, appendData); |
| // if we get here, a retry is not going to help (the SQL |
| // operation succeeded but simply did not select a row |
| // that could be updated, likely because of the check on |
| // MODCOUNT |
| shouldRetry = false; |
| connection.commit(); |
| } catch (SQLException ex) { |
| continueIfStringOverflow(ex); |
| this.ch.rollbackConnection(connection); |
| success = false; |
| } |
| } |
| } |
| if (!success && shouldRetry) { |
| data = ser.asString(document, tmd.getColumnOnlyProperties()); |
| Object m = document.get(MODIFIED); |
| long modified = (m instanceof Long) ? ((Long)m).longValue() : 0; |
| success = db.update(connection, tmd, document.getId(), modified, hasBinary, deletedOnce, modcount, cmodcount, |
| oldmodcount, data); |
| connection.commit(); |
| } |
| return success; |
| } catch (SQLException ex) { |
| this.ch.rollbackConnection(connection); |
| String addDiags = ""; |
| if (data != null && RDBJDBCTools.matchesSQLState(ex, "22", "72")) { |
| byte[] bytes = asBytes(data); |
| addDiags = String.format(" (DATA size in Java characters: %d, in octets: %d, computed character limit: %d)", |
| data.length(), bytes.length, tmd.getDataLimitInOctets() / CHAR2OCTETRATIO); |
| } |
| String message = String.format("Update for %s failed%s", document.getId(), addDiags); |
| LOG.debug(message, ex); |
| throw handleException(message, ex, collection, document.getId()); |
| } finally { |
| this.ch.closeConnection(connection); |
| } |
| } |
| |
| private static void continueIfStringOverflow(SQLException ex) throws SQLException { |
| String state = ex.getSQLState(); |
| if ("22001".equals(state) /* everybody */|| ("72000".equals(state) && 1489 == ex.getErrorCode()) /* Oracle */ |
| || ("S0001".equals(state) && 2628 == ex.getErrorCode()) /* MSSQL update*/) { |
| // ok |
| } else { |
| throw (ex); |
| } |
| } |
| |
| private static boolean isAppendableUpdate(UpdateOp update) { |
| return NOAPPEND == false; |
| } |
| |
| private static long getModifiedFromOperation(Operation op) { |
| return op == null ? 0L : Long.parseLong(op.value.toString()); |
| } |
| |
| private <T extends Document> boolean insertDocuments(Collection<T> collection, List<T> documents) { |
| Connection connection = null; |
| RDBTableMetaData tmd = getTable(collection); |
| try { |
| connection = this.ch.getRWConnection(); |
| Set<String> insertedKeys = db.insert(connection, tmd, documents); |
| connection.commit(); |
| return insertedKeys.size() == documents.size(); |
| } catch (SQLException ex) { |
| this.ch.rollbackConnection(connection); |
| |
| List<String> ids = new ArrayList<String>(); |
| for (T doc : documents) { |
| ids.add(doc.getId()); |
| } |
| String message = String.format("insert of %s failed", ids); |
| LOG.debug(message, ex); |
| |
| // collect additional exceptions |
| String messages = LOG.isDebugEnabled() ? RDBJDBCTools.getAdditionalMessages(ex) : ""; |
| |
| // see whether a DATA error was involved |
| boolean dataRelated = false; |
| SQLException walk = ex; |
| while (walk != null && !dataRelated) { |
| dataRelated = RDBJDBCTools.matchesSQLState(walk, "22", "72"); |
| walk = walk.getNextException(); |
| } |
| if (dataRelated) { |
| String id = null; |
| int longest = 0, longestChars = 0; |
| |
| for (Document d : documents) { |
| String data = ser.asString(d, tmd.getColumnOnlyProperties()); |
| byte bytes[] = asBytes(data); |
| if (bytes.length > longest) { |
| longest = bytes.length; |
| longestChars = data.length(); |
| id = d.getId(); |
| } |
| } |
| |
| String m = String |
| .format(" (potential cause: long data for ID %s - longest octet DATA size in Java characters: %d, in octets: %d, computed character limit: %d)", |
| id, longest, longestChars, tmd.getDataLimitInOctets() / CHAR2OCTETRATIO); |
| messages += m; |
| } |
| |
| if (!messages.isEmpty()) { |
| LOG.debug("additional diagnostics: " + messages); |
| } |
| |
| throw handleException(message, ex, collection, ids); |
| } finally { |
| this.ch.closeConnection(connection); |
| } |
| } |
| |
| // configuration |
| |
| // Whether to use GZIP compression |
| private static final boolean NOGZIP = SystemPropertySupplier |
| .create("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.NOGZIP", Boolean.FALSE).loggingTo(LOG).get(); |
| |
| // Whether to use append operations (string concatenation) in the DATA column |
| private static final boolean NOAPPEND = SystemPropertySupplier |
| .create("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.NOAPPEND", Boolean.FALSE).loggingTo(LOG).get(); |
| |
| // Number of documents to insert at once for batch create |
| private static final int CHUNKSIZE = SystemPropertySupplier |
| .create("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHUNKSIZE", 64).loggingTo(LOG) |
| .validateWith(value -> value > 0).get(); |
| |
| // Number of query hits above which a diagnostic warning is generated |
| private static final int QUERYHITSLIMIT = SystemPropertySupplier |
| .create("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QUERYHITSLIMIT", 4096).loggingTo(LOG) |
| .validateWith(value -> value > 0).get(); |
| |
| // Number of elapsed ms in a query above which a diagnostic warning is generated |
| private static final int QUERYTIMELIMIT = SystemPropertySupplier |
| .create("org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.QUERYTIMELIMIT", 10000).loggingTo(LOG) |
| .validateWith(value -> value > 0).get(); |
| |
| // Whether to use JDBC batch commands for the createOrUpdate (default: true) |
| private static final boolean BATCHUPDATES = SystemPropertySupplier |
| .create(RDBDocumentStore.class.getName() + ".BATCHUPDATES", Boolean.TRUE).loggingTo(LOG) |
| .formatSetMessage((name, value) -> { |
| return String.format("Batch updates disabled (system property %s set to '%s')", name, value); |
| }).get(); |
| |
| public static byte[] asBytes(@NotNull String data) { |
| byte[] bytes; |
| try { |
| bytes = data.getBytes("UTF-8"); |
| } catch (UnsupportedEncodingException ex) { |
| LOG.error("UTF-8 not supported??", ex); |
| throw asDocumentStoreException(ex, "UTF-8 not supported??"); |
| } |
| |
| if (NOGZIP) { |
| return bytes; |
| } else { |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(bytes.length / 2); |
| try (GZIPOutputStream gos = asGZIPOutputStream(bos, Deflater.BEST_SPEED)) { |
| gos.write(bytes); |
| gos.close(); |
| byte[] compressedBytes = bos.toByteArray(); |
| if (LOG.isTraceEnabled()) { |
| long ratio = (100L * compressedBytes.length) / bytes.length; |
| LOG.trace("Gzipped {} bytes to {} ({}%)", bytes.length, compressedBytes.length, ratio); |
| } |
| return compressedBytes; |
| } catch (IOException ex) { |
| LOG.error("Error while gzipping contents", ex); |
| throw asDocumentStoreException(ex, "Error while gzipping contents"); |
| } |
| } |
| } |
| |
| private static GZIPOutputStream asGZIPOutputStream(OutputStream os, final int level) throws IOException { |
| return new GZIPOutputStream(os) { |
| { |
| this.def.setLevel(level); |
| } |
| }; |
| } |
| |
| @Override |
| public void setReadWriteMode(String readWriteMode) { |
| // ignored |
| } |
| |
| public void setStatsCollector(DocumentStoreStatsCollector stats) { |
| this.stats = stats; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T extends Document> T castAsT(NodeDocument doc) { |
| return (T) doc; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T extends Document> List<NodeDocument> castAsNodeDocumentList(List<T> list) { |
| return (List<NodeDocument>) list; |
| } |
| |
| private NodeDocumentCache nodesCache; |
| |
| private NodeDocumentLocks locks; |
| |
| @Nullable |
| private static NodeDocument unwrap(@NotNull NodeDocument doc) { |
| return doc == NodeDocument.NULL ? null : doc; |
| } |
| |
| @NotNull |
| private static NodeDocument wrap(@Nullable NodeDocument doc) { |
| return doc == null ? NodeDocument.NULL : doc; |
| } |
| |
| @NotNull |
| private static String idOf(@NotNull Document doc) { |
| String id = doc.getId(); |
| if (id == null) { |
| throw new IllegalArgumentException("non-null ID expected"); |
| } |
| return id; |
| } |
| |
| private static long modcountOf(@NotNull Document doc) { |
| Long n = doc.getModCount(); |
| return n != null ? n : -1; |
| } |
| |
| private static long modifiedOf(@NotNull Document doc) { |
| Object l = doc.get(NodeDocument.MODIFIED_IN_SECS); |
| return (l instanceof Long) ? ((Long)l).longValue() : -1; |
| } |
| |
| @NotNull |
| protected <T extends Document> T convertFromDBObject(@NotNull Collection<T> collection, @NotNull RDBRow row) { |
| // this method is present here in order to facilitate unit testing for OAK-3566 |
| return ser.fromRow(collection, row); |
| } |
| |
| private static boolean hasChangesToCollisions(UpdateOp update) { |
| if (!USECMODCOUNT) { |
| return false; |
| } else { |
| for (Entry<Key, Operation> e : checkNotNull(update).getChanges().entrySet()) { |
| Key k = e.getKey(); |
| Operation op = e.getValue(); |
| if (op.type == Operation.Type.SET_MAP_ENTRY) { |
| if (NodeDocument.COLLISIONS.equals(k.getName())) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| } |
| |
| @NotNull |
| private static <T extends Document> String dumpKeysAndModcounts(Map<String, T> docs) { |
| if (docs.isEmpty()) { |
| return "-"; |
| } else { |
| StringBuilder sb = new StringBuilder(); |
| for (Map.Entry<String, T> e : docs.entrySet()) { |
| Long mc = e.getValue().getModCount(); |
| if (sb.length() != 0) { |
| sb.append(", "); |
| } |
| sb.append(String.format("%s (%s)", e.getKey(), mc == null ? "" : mc.toString())); |
| } |
| return sb.toString(); |
| } |
| } |
| |
| // keeping track of CLUSTER_NODES updates |
| private Map<String, Long> cnUpdates = new ConcurrentHashMap<String, Long>(); |
| |
| private <T extends Document> void maintainUpdateStats(Collection<T> collection, String key) { |
| if (collection == Collection.CLUSTER_NODES) { |
| synchronized (this) { |
| Long old = cnUpdates.get(key); |
| old = old == null ? Long.valueOf(1) : old + 1; |
| cnUpdates.put(key, old); |
| } |
| } |
| } |
| |
| private String getCnStats() { |
| if (cnUpdates.isEmpty()) { |
| return ""; |
| } else { |
| List<Map.Entry<String, Long>> tmp = new ArrayList<>(cnUpdates.entrySet()); |
| Collections.sort(tmp, (Entry<String, Long> o1, Entry<String, Long> o2) -> o1.getKey().compareTo(o2.getKey())); |
| return " (Cluster Node updates: " + tmp.toString() + ")"; |
| } |
| } |
| |
| private Stopwatch startWatch() { |
| return Stopwatch.createStarted(); |
| } |
| |
| protected NodeDocumentCache getNodeDocumentCache() { |
| return nodesCache; |
| } |
| |
| private <T extends Document> DocumentStoreException handleException(String message, Exception ex, Collection<T> collection, |
| java.util.Collection<String> ids) { |
| if (collection == Collection.NODES) { |
| for (String id : ids) { |
| invalidateCache(collection, id, false); |
| } |
| } |
| return asDocumentStoreException(ex, message); |
| } |
| |
| private <T extends Document> DocumentStoreException handleException(String message, Exception ex, Collection<T> collection, |
| String id) { |
| return handleException(message, ex, collection, Collections.singleton(id)); |
| } |
| |
| protected class UnsupportedIndexedPropertyException extends DocumentStoreException { |
| |
| private static final long serialVersionUID = -8392572622365260105L; |
| |
| public UnsupportedIndexedPropertyException(String message) { |
| super(message); |
| } |
| } |
| |
| private CacheLock acquireLockFor(String id) { |
| return new CacheLock(this.locks, id); |
| } |
| |
| private static class CacheLock implements AutoCloseable { |
| |
| private final Lock lock; |
| |
| public CacheLock(NodeDocumentLocks locks, String id) { |
| this.lock = locks.acquire(id); |
| } |
| |
| @Override |
| public void close() { |
| lock.unlock(); |
| } |
| } |
| |
| // slightly extended query support |
| protected static class QueryCondition { |
| |
| private final String propertyName, operator; |
| private final List<? extends Object> operands; |
| |
| public QueryCondition(String propertyName, String operator, long value) { |
| this.propertyName = propertyName; |
| this.operator = operator; |
| this.operands = Collections.singletonList(value); |
| } |
| |
| public QueryCondition(String propertyName, String operator, List<? extends Object> values) { |
| this.propertyName = propertyName; |
| this.operator = operator; |
| this.operands = values; |
| } |
| |
| public QueryCondition(String propertyName, String operator) { |
| this.propertyName = propertyName; |
| this.operator = operator; |
| this.operands = Collections.emptyList(); |
| } |
| |
| public String getPropertyName() { |
| return propertyName; |
| } |
| |
| public String getOperator() { |
| return operator; |
| } |
| |
| public List<? extends Object> getOperands() { |
| return this.operands; |
| } |
| |
| @Override |
| public String toString() { |
| if (this.operands.isEmpty()) { |
| return String.format("%s %s", propertyName, operator); |
| } else if (this.operands.size() == 1) { |
| return String.format("%s %s %s", propertyName, operator, operands.get(0).toString()); |
| } else { |
| return String.format("%s %s %s", propertyName, operator, operands.toString()); |
| } |
| } |
| } |
| } |