blob: 5899f8d8c45c5f86469639b2cc0e7aac94a113de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.rdb;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.asDocumentStoreException;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJSONSupport.appendJsonMember;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJSONSupport.appendJsonString;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJSONSupport.appendJsonValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.oak.commons.json.JsopReader;
import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Serialization/Parsing of documents.
*/
public class RDBDocumentSerializer {
private final DocumentStore store;
private static final String MODIFIED = NodeDocument.MODIFIED_IN_SECS;
private static final String MODCOUNT = NodeDocument.MOD_COUNT;
private static final String CMODCOUNT = "_collisionsModCount";
private static final String SDTYPE = NodeDocument.SD_TYPE;
private static final String SDMAXREVTIME = NodeDocument.SD_MAX_REV_TIME_IN_SECS;
private static final String ID = "_id";
private static final String HASBINARY = NodeDocument.HAS_BINARY_FLAG;
private static final String DELETEDONCE = NodeDocument.DELETED_ONCE;
private final Comparator<Revision> comparator = StableRevisionComparator.REVERSE;
private static final Logger LOG = LoggerFactory.getLogger(RDBDocumentSerializer.class);
private static final RDBJSONSupport JSON = new RDBJSONSupport(true);
public RDBDocumentSerializer(DocumentStore store) {
this.store = store;
}
/**
* Serializes all non-column properties of the {@link Document} into a JSON
* string.
*/
public String asString(@NotNull Document doc, Set<String> columnProperties) {
StringBuilder sb = new StringBuilder(32768);
sb.append("{");
boolean needComma = false;
for (Map.Entry<String, Object> entry : doc.entrySet()) {
String key = entry.getKey();
if (!columnProperties.contains(key)) {
if (needComma) {
sb.append(",");
}
appendJsonMember(sb, key, entry.getValue());
needComma = true;
}
}
sb.append("}");
return sb.toString();
}
/**
* Serializes the changes in the {@link UpdateOp} into a JSON array; each
* entry is another JSON array holding operation, key, revision, and value.
*/
public String asString(UpdateOp update, Set<String> columnProperties) {
StringBuilder sb = new StringBuilder("[");
boolean needComma = false;
for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
Operation op = change.getValue();
Key key = change.getKey();
// exclude properties that are serialized into special columns
if (columnProperties.contains(key.getName()) && null == key.getRevision())
continue;
if (needComma) {
sb.append(",");
}
sb.append("[");
if (op.type == UpdateOp.Operation.Type.INCREMENT) {
sb.append("\"+\",");
} else if (op.type == UpdateOp.Operation.Type.SET || op.type == UpdateOp.Operation.Type.SET_MAP_ENTRY) {
sb.append("\"=\",");
} else if (op.type == UpdateOp.Operation.Type.MAX) {
sb.append("\"M\",");
} else if (op.type == UpdateOp.Operation.Type.REMOVE || op.type == UpdateOp.Operation.Type.REMOVE_MAP_ENTRY) {
sb.append("\"*\",");
} else {
throw new DocumentStoreException("Can't serialize " + update.toString() + " for JSON append");
}
appendJsonString(sb, key.getName());
sb.append(",");
Revision rev = key.getRevision();
if (rev != null) {
appendJsonString(sb, rev.toString());
sb.append(",");
}
appendJsonValue(sb, op.value);
sb.append("]");
needComma = true;
}
return sb.append("]").toString();
}
/**
* Reconstructs a {@link Document} based on the persisted {@link RDBRow}.
*/
@NotNull
public <T extends Document> T fromRow(@NotNull Collection<T> collection, @NotNull RDBRow row) throws DocumentStoreException {
final String charData = row.getData();
checkNotNull(charData, "RDBRow.getData() is null for collection " + collection + ", id: " + row.getId());
T doc = collection.newDocument(store);
doc.put(ID, row.getId());
if (row.getModified() != RDBRow.LONG_UNSET) {
doc.put(MODIFIED, row.getModified());
}
if (row.getModcount() != RDBRow.LONG_UNSET) {
doc.put(MODCOUNT, row.getModcount());
}
if (RDBDocumentStore.USECMODCOUNT && row.getCollisionsModcount() != RDBRow.LONG_UNSET) {
doc.put(CMODCOUNT, row.getCollisionsModcount());
}
if (row.hasBinaryProperties() != null) {
doc.put(HASBINARY, row.hasBinaryProperties().longValue());
}
if (row.deletedOnce() != null) {
doc.put(DELETEDONCE, row.deletedOnce().booleanValue());
}
if (row.getSchemaVersion() >= 2) {
if (row.getSdType() != RDBRow.LONG_UNSET) {
doc.put(SDTYPE, row.getSdType());
}
if (row.getSdMaxRevTime() != RDBRow.LONG_UNSET) {
doc.put(SDMAXREVTIME, row.getSdMaxRevTime());
}
}
byte[] bdata = row.getBdata();
boolean blobInUse = false;
JsopTokenizer json;
// case #1: BDATA (blob) contains base data, DATA (string) contains
// update operations
try {
if (bdata != null && bdata.length != 0) {
String s = fromBlobData(bdata);
json = new JsopTokenizer(s);
json.read('{');
readDocumentFromJson(json, doc);
json.read(JsopReader.END);
blobInUse = true;
}
} catch (Exception ex) {
throw asDocumentStoreException(ex, "parsing blob data as JSON");
}
json = new JsopTokenizer(charData);
// start processing the VARCHAR data
try {
int next = json.read();
if (next == '{') {
if (blobInUse) {
throw new DocumentStoreException("expected literal \"blob\" but found: " + row.getData());
}
readDocumentFromJson(json, doc);
} else if (next == JsopReader.STRING) {
if (!blobInUse) {
throw new DocumentStoreException("did not expect \"blob\" here: " + row.getData());
}
if (!"blob".equals(json.getToken())) {
throw new DocumentStoreException("expected string literal \"blob\"");
}
} else {
throw new DocumentStoreException("unexpected token " + next + " in " + row.getData());
}
next = json.read();
if (next == ',') {
do {
Object ob = JSON.parse(json);
if (!(ob instanceof List)) {
throw new DocumentStoreException("expected array but got: " + ob);
}
List<List<Object>> update = (List<List<Object>>) ob;
for (List<Object> op : update) {
applyUpdate(doc, update, op);
}
} while (json.matches(','));
}
json.read(JsopReader.END);
// OAK-7855: check and fix _sdType
checkSdType(doc);
return doc;
} catch (Exception ex) {
String message = String.format("Error processing persisted data for document '%s'", row.getId());
if (charData.length() > 0) {
int last = charData.charAt(charData.length() - 1);
if (last != '}' && last != '"' && last != ']') {
message += " (DATA column might be truncated)";
}
}
LOG.error(message, ex);
throw asDocumentStoreException(ex, message);
}
}
private <T extends Document> void applyUpdate(T doc, List updateString, List<Object> op) {
String opcode = op.get(0).toString();
String key = op.get(1).toString();
Revision rev = null;
Object value = null;
if (op.size() == 3) {
value = op.get(2);
} else {
rev = Revision.fromString(op.get(2).toString());
value = op.get(3);
}
Object old = doc.get(key);
if ("=".equals(opcode)) {
if (rev == null) {
doc.put(key, value);
} else {
@SuppressWarnings("unchecked")
Map<Revision, Object> m = (Map<Revision, Object>) old;
if (m == null) {
m = new TreeMap<Revision, Object>(comparator);
doc.put(key, m);
}
m.put(rev, value);
}
} else if ("*".equals(opcode)) {
if (rev == null) {
doc.remove(key);
} else {
@SuppressWarnings("unchecked")
Map<Revision, Object> m = (Map<Revision, Object>) old;
if (m != null) {
m.remove(rev);
}
}
} else if ("+".equals(opcode)) {
if (rev == null) {
Long x = (Long) value;
if (old == null) {
old = 0L;
}
doc.put(key, ((Long) old) + x);
} else {
throw new DocumentStoreException("unexpected operation " + op + " in: " + updateString);
}
} else if ("M".equals(opcode)) {
if (rev == null) {
Comparable newValue = (Comparable) value;
if (old == null || newValue.compareTo(old) > 0) {
doc.put(key, value);
}
} else {
throw new DocumentStoreException("unexpected operation " + op + " in: " + updateString);
}
} else {
throw new DocumentStoreException("unexpected operation " + op + " in: " + updateString);
}
}
/**
* Reads from an opened JSON stream ("{" already consumed) into a document.
*/
private static <T extends Document> void readDocumentFromJson(@NotNull JsopTokenizer json, @NotNull T doc) {
if (!json.matches('}')) {
do {
String key = json.readString();
json.read(':');
Object value = JSON.parse(json);
doc.put(key, value);
} while (json.matches(','));
json.read('}');
}
}
private static void checkSdType(Document doc) {
Object sdType = doc.get(NodeDocument.SD_TYPE);
if (sdType instanceof Long) {
long value = (long) sdType;
if (value == 0) {
doc.remove(NodeDocument.SD_TYPE);
LOG.debug("Incorrect _sdType 0 in {}", doc.getId());
}
}
}
// low level operations
private static byte[] GZIPSIG = { 31, -117 };
private static String fromBlobData(byte[] bdata) {
try {
if (bdata.length >= 2 && bdata[0] == GZIPSIG[0] && bdata[1] == GZIPSIG[1]) {
// GZIP
try (GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(bdata), 65536)) {
return IOUtils.toString(gis, "UTF-8");
}
} else {
return IOUtils.toString(bdata, "UTF-8");
}
} catch (IOException ex) {
LOG.debug("Unexpected exception while processing blob data", ex);
throw new RuntimeException(ex);
}
}
}