blob: b3eb07142a96b0a82f38b1d824e07a09826d730e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.EdgeLabel;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphElement;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.log.TransactionRecovery;
import org.janusgraph.core.schema.ConsistencyModifier;
import org.janusgraph.core.schema.JanusGraphIndex;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
import org.janusgraph.core.schema.Mapping;
import org.janusgraph.core.schema.Parameter;
import org.janusgraph.core.schema.PropertyKeyMaker;
import org.janusgraph.diskstorage.BackendTransaction;
import org.janusgraph.diskstorage.indexing.IndexEntry;
import org.janusgraph.graphdb.database.IndexSerializer;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.internal.Token;
import org.janusgraph.graphdb.log.StandardTransactionLogProcessor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.IndexType;
import org.janusgraph.graphdb.types.MixedIndexType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Janus implementation of AtlasGraphManagement.
*/
public class AtlasJanusGraphManagement implements AtlasGraphManagement {
private static final boolean lockEnabled = AtlasConfiguration.STORAGE_CONSISTENCY_LOCK_ENABLED.getBoolean();
private static final Parameter[] STRING_PARAMETER_ARRAY = new Parameter[]{Mapping.STRING.asParameter()};
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphManagement.class);
private static final char[] RESERVED_CHARS = { '{', '}', '"', '$', Token.SEPARATOR_CHAR };
private AtlasJanusGraph graph;
private JanusGraphManagement management;
private Set<String> newMultProperties = new HashSet<>();
public AtlasJanusGraphManagement(AtlasJanusGraph graph, JanusGraphManagement managementSystem) {
this.management = managementSystem;
this.graph = graph;
}
@Override
public void createVertexMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
IndexBuilder indexBuilder = management.buildIndex(indexName, Vertex.class);
for (AtlasPropertyKey key : propertyKeys) {
PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key);
indexBuilder.addKey(janusKey);
}
indexBuilder.buildMixedIndex(backingIndex);
}
@Override
public void createEdgeMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
IndexBuilder indexBuilder = management.buildIndex(indexName, Edge.class);
for (AtlasPropertyKey key : propertyKeys) {
PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key);
indexBuilder.addKey(janusKey);
}
indexBuilder.buildMixedIndex(backingIndex);
}
@Override
public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
if (edgeLabel == null) {
edgeLabel = management.makeEdgeLabel(label).make();
}
Direction direction = AtlasJanusObjectFactory.createDirection(edgeDirection);
PropertyKey[] keys = AtlasJanusObjectFactory.createPropertyKeys(propertyKeys);
if (management.getRelationIndex(edgeLabel, indexName) == null) {
management.buildEdgeIndex(edgeLabel, indexName, direction, keys);
}
}
@Override
public void createFullTextMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
IndexBuilder indexBuilder = management.buildIndex(indexName, Vertex.class);
for (AtlasPropertyKey key : propertyKeys) {
PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key);
indexBuilder.addKey(janusKey, org.janusgraph.core.schema.Parameter.of("mapping", Mapping.TEXT));
}
indexBuilder.buildMixedIndex(backingIndex);
}
@Override
public boolean containsPropertyKey(String propertyName) {
return management.containsPropertyKey(propertyName);
}
@Override
public void rollback() {
management.rollback();
}
@Override
public void commit() {
graph.addMultiProperties(newMultProperties);
newMultProperties.clear();
management.commit();
}
private static void checkName(String name) {
//for some reason, name checking was removed from StandardPropertyKeyMaker.make()
//in Janus. For consistency, do the check here.
Preconditions.checkArgument(StringUtils.isNotBlank(name), "Need to specify name");
for (char c : RESERVED_CHARS) {
Preconditions.checkArgument(name.indexOf(c) < 0, "Name can not contains reserved character %s: %s", c, name);
}
}
@Override
public AtlasPropertyKey makePropertyKey(String propertyName, Class propertyClass, AtlasCardinality cardinality) {
if (cardinality.isMany()) {
newMultProperties.add(propertyName);
}
PropertyKeyMaker propertyKeyBuilder = management.makePropertyKey(propertyName).dataType(propertyClass);
if (cardinality != null) {
Cardinality janusCardinality = AtlasJanusObjectFactory.createCardinality(cardinality);
propertyKeyBuilder.cardinality(janusCardinality);
}
PropertyKey propertyKey = propertyKeyBuilder.make();
return GraphDbObjectFactory.createPropertyKey(propertyKey);
}
@Override
public AtlasEdgeLabel makeEdgeLabel(String label) {
EdgeLabel edgeLabel = management.makeEdgeLabel(label).make();
return GraphDbObjectFactory.createEdgeLabel(edgeLabel);
}
@Override
public void deletePropertyKey(String propertyKey) {
PropertyKey janusPropertyKey = management.getPropertyKey(propertyKey);
if (null == janusPropertyKey) return;
for (int i = 0;; i++) {
String deletedKeyName = janusPropertyKey + "_deleted_" + i;
if (null == management.getPropertyKey(deletedKeyName)) {
management.changeName(janusPropertyKey, deletedKeyName);
break;
}
}
}
@Override
public AtlasPropertyKey getPropertyKey(String propertyName) {
checkName(propertyName);
return GraphDbObjectFactory.createPropertyKey(management.getPropertyKey(propertyName));
}
@Override
public AtlasEdgeLabel getEdgeLabel(String label) {
return GraphDbObjectFactory.createEdgeLabel(management.getEdgeLabel(label));
}
@Override
public String addMixedIndex(String indexName, AtlasPropertyKey propertyKey, boolean isStringField) {
PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(propertyKey);
JanusGraphIndex janusGraphIndex = management.getGraphIndex(indexName);
if(isStringField) {
management.addIndexKey(janusGraphIndex, janusKey, Mapping.STRING.asParameter());
LOG.debug("created a string type for {} with janueKey {}.", propertyKey.getName(), janusKey);
} else {
management.addIndexKey(janusGraphIndex, janusKey);
LOG.debug("created a default type for {} with janueKey {}.", propertyKey.getName(), janusKey);
}
String encodedName = "";
if(isStringField) {
encodedName = graph.getIndexFieldName(propertyKey, janusGraphIndex, STRING_PARAMETER_ARRAY);
} else {
encodedName = graph.getIndexFieldName(propertyKey, janusGraphIndex);
}
LOG.info("property '{}' is encoded to '{}'.", propertyKey.getName(), encodedName);
return encodedName;
}
@Override
public String getIndexFieldName(String indexName, AtlasPropertyKey propertyKey, boolean isStringField) {
JanusGraphIndex janusGraphIndex = management.getGraphIndex(indexName);
if(isStringField) {
return graph.getIndexFieldName(propertyKey, janusGraphIndex, STRING_PARAMETER_ARRAY);
} else {
return graph.getIndexFieldName(propertyKey, janusGraphIndex);
}
}
public AtlasGraphIndex getGraphIndex(String indexName) {
JanusGraphIndex index = management.getGraphIndex(indexName);
return GraphDbObjectFactory.createGraphIndex(index);
}
@Override
public boolean edgeIndexExist(String label, String indexName) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null;
}
@Override
public void createVertexCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) {
createCompositeIndex(propertyName, isUnique, propertyKeys, Vertex.class);
}
@Override
public void createEdgeCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) {
createCompositeIndex(propertyName, isUnique, propertyKeys, Edge.class);
}
private void createCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys, Class<? extends Element> elementType) {
IndexBuilder indexBuilder = management.buildIndex(propertyName, elementType);
for (AtlasPropertyKey key : propertyKeys) {
PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key);
indexBuilder.addKey(janusKey);
}
if (isUnique) {
indexBuilder.unique();
}
JanusGraphIndex index = indexBuilder.buildCompositeIndex();
if (lockEnabled && isUnique) {
management.setConsistency(index, ConsistencyModifier.LOCK);
}
}
@Override
public void updateUniqueIndexesForConsistencyLock() {
try {
setConsistency(this.management, Vertex.class);
setConsistency(this.management, Edge.class);
} finally {
commit();
}
}
private static void setConsistency(JanusGraphManagement mgmt, Class<? extends Element> elementType) {
LOG.info("setConsistency: {}: Starting...", elementType.getSimpleName());
int count = 0;
try {
Iterable<JanusGraphIndex> iterable = mgmt.getGraphIndexes(elementType);
for (JanusGraphIndex index : iterable) {
if (!index.isCompositeIndex() || !index.isUnique() || mgmt.getConsistency(index) == ConsistencyModifier.LOCK) {
continue;
}
for (PropertyKey propertyKey : index.getFieldKeys()) {
LOG.info("setConsistency: {}: {}", count, propertyKey.name());
}
mgmt.setConsistency(index, ConsistencyModifier.LOCK);
count++;
}
}
finally {
LOG.info("setConsistency: {}: {}: Done!", elementType.getSimpleName(), count);
}
}
@Override
public void reindex(String indexName, List<AtlasElement> elements) throws Exception {
try {
JanusGraphIndex index = management.getGraphIndex(indexName);
if (index == null || !(management instanceof ManagementSystem) || !(graph.getGraph() instanceof StandardJanusGraph)) {
LOG.error("Could not retrieve index for name: {} ", indexName);
return;
}
ManagementSystem managementSystem = (ManagementSystem) management;
IndexType indexType = managementSystem.getSchemaVertex(index).asIndexType();
if (!(indexType instanceof MixedIndexType)) {
LOG.warn("Index: {}: Not of MixedIndexType ", indexName);
return;
}
IndexSerializer indexSerializer = ((StandardJanusGraph) graph.getGraph()).getIndexSerializer();
reindexElement(managementSystem, indexSerializer, (MixedIndexType) indexType, elements);
} catch (Exception exception) {
throw exception;
} finally {
management.commit();
}
}
@Override
public Object startIndexRecovery(long recoveryStartTime) {
Instant recoveryStartInstant = Instant.ofEpochMilli(recoveryStartTime);
JanusGraph janusGraph = this.graph.getGraph();
return JanusGraphFactory.startTransactionRecovery(janusGraph, recoveryStartInstant);
}
@Override
public void stopIndexRecovery(Object txRecoveryObject) {
if (txRecoveryObject == null) {
return;
}
try {
if (txRecoveryObject instanceof TransactionRecovery) {
TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject;
StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph();
LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!");
janusGraph.getBackend().getSystemTxLog().close();
txRecovery.shutdown();
} else {
LOG.error("stopIndexRecovery({}): Invalid transaction recovery object!", txRecoveryObject);
}
} catch (Exception e) {
LOG.warn("stopIndexRecovery: Error while shutting down transaction recovery", e);
}
}
@Override
public void printIndexRecoveryStats(Object txRecoveryObject) {
if (txRecoveryObject == null) {
return;
}
try {
if (txRecoveryObject instanceof TransactionRecovery) {
StandardTransactionLogProcessor txRecovery = (StandardTransactionLogProcessor) txRecoveryObject;
long[] statistics = txRecovery.getStatistics();
if (statistics.length >= 2) {
LOG.info("Index Recovery: Stats: Success:{}: Failed: {}", statistics[0], statistics[1]);
} else {
LOG.info("Index Recovery: Stats: {}", statistics);
}
} else {
LOG.error("Transaction stats: Invalid transaction recovery object!: Unexpected type: {}: Details: {}", txRecoveryObject.getClass().toString(), txRecoveryObject);
}
} catch (Exception e) {
LOG.error("Error: Retrieving log transaction stats!", e);
}
}
private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception {
Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>();
StandardJanusGraphTx tx = managementSystem.getWrappedTx();
BackendTransaction txHandle = tx.getTxHandle();
try {
JanusGraphElement janusGraphElement = null;
for (AtlasElement element : elements) {
try {
if (element == null || element.getWrappedElement() == null) {
continue;
}
janusGraphElement = element.getWrappedElement();
indexSerializer.reindexElement(janusGraphElement, indexType, documentsPerStore);
} catch (Exception e) {
LOG.warn("{}: Exception: {}:{}", indexType.getName(), e.getClass().getSimpleName(), e.getMessage());
}
}
} finally {
if (txHandle != null) {
txHandle.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore);
}
}
}
}