| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.repository.graphdb.janus; |
| |
| import com.google.common.base.Preconditions; |
| import org.apache.atlas.AtlasConfiguration; |
| import org.apache.atlas.repository.graphdb.AtlasCardinality; |
| import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; |
| import org.apache.atlas.repository.graphdb.AtlasEdgeLabel; |
| import org.apache.atlas.repository.graphdb.AtlasElement; |
| import org.apache.atlas.repository.graphdb.AtlasGraphIndex; |
| import org.apache.atlas.repository.graphdb.AtlasGraphManagement; |
| import org.apache.atlas.repository.graphdb.AtlasPropertyKey; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.tinkerpop.gremlin.structure.Direction; |
| import org.apache.tinkerpop.gremlin.structure.Edge; |
| import org.apache.tinkerpop.gremlin.structure.Element; |
| import org.apache.tinkerpop.gremlin.structure.Vertex; |
| import org.janusgraph.core.Cardinality; |
| import org.janusgraph.core.EdgeLabel; |
| import org.janusgraph.core.JanusGraph; |
| import org.janusgraph.core.JanusGraphElement; |
| import org.janusgraph.core.JanusGraphFactory; |
| import org.janusgraph.core.PropertyKey; |
| import org.janusgraph.core.log.TransactionRecovery; |
| import org.janusgraph.core.schema.ConsistencyModifier; |
| import org.janusgraph.core.schema.JanusGraphIndex; |
| import org.janusgraph.core.schema.JanusGraphManagement; |
| import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder; |
| import org.janusgraph.core.schema.Mapping; |
| import org.janusgraph.core.schema.Parameter; |
| import org.janusgraph.core.schema.PropertyKeyMaker; |
| import org.janusgraph.diskstorage.BackendTransaction; |
| import org.janusgraph.diskstorage.indexing.IndexEntry; |
| import org.janusgraph.graphdb.database.IndexSerializer; |
| import org.janusgraph.graphdb.database.StandardJanusGraph; |
| import org.janusgraph.graphdb.database.management.ManagementSystem; |
| import org.janusgraph.graphdb.internal.Token; |
| import org.janusgraph.graphdb.log.StandardTransactionLogProcessor; |
| import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; |
| import org.janusgraph.graphdb.types.IndexType; |
| import org.janusgraph.graphdb.types.MixedIndexType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.time.Instant; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| /** |
| * Janus implementation of AtlasGraphManagement. |
| */ |
| public class AtlasJanusGraphManagement implements AtlasGraphManagement { |
| private static final boolean lockEnabled = AtlasConfiguration.STORAGE_CONSISTENCY_LOCK_ENABLED.getBoolean(); |
| private static final Parameter[] STRING_PARAMETER_ARRAY = new Parameter[]{Mapping.STRING.asParameter()}; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphManagement.class); |
| private static final char[] RESERVED_CHARS = { '{', '}', '"', '$', Token.SEPARATOR_CHAR }; |
| |
| private AtlasJanusGraph graph; |
| private JanusGraphManagement management; |
| private Set<String> newMultProperties = new HashSet<>(); |
| |
| public AtlasJanusGraphManagement(AtlasJanusGraph graph, JanusGraphManagement managementSystem) { |
| this.management = managementSystem; |
| this.graph = graph; |
| } |
| |
| @Override |
| public void createVertexMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) { |
| IndexBuilder indexBuilder = management.buildIndex(indexName, Vertex.class); |
| |
| for (AtlasPropertyKey key : propertyKeys) { |
| PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key); |
| indexBuilder.addKey(janusKey); |
| } |
| |
| indexBuilder.buildMixedIndex(backingIndex); |
| } |
| |
| @Override |
| public void createEdgeMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) { |
| IndexBuilder indexBuilder = management.buildIndex(indexName, Edge.class); |
| |
| for (AtlasPropertyKey key : propertyKeys) { |
| PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key); |
| indexBuilder.addKey(janusKey); |
| } |
| |
| indexBuilder.buildMixedIndex(backingIndex); |
| } |
| |
| @Override |
| public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) { |
| EdgeLabel edgeLabel = management.getEdgeLabel(label); |
| |
| if (edgeLabel == null) { |
| edgeLabel = management.makeEdgeLabel(label).make(); |
| } |
| |
| Direction direction = AtlasJanusObjectFactory.createDirection(edgeDirection); |
| PropertyKey[] keys = AtlasJanusObjectFactory.createPropertyKeys(propertyKeys); |
| |
| if (management.getRelationIndex(edgeLabel, indexName) == null) { |
| management.buildEdgeIndex(edgeLabel, indexName, direction, keys); |
| } |
| } |
| |
| @Override |
| public void createFullTextMixedIndex(String indexName, String backingIndex, List<AtlasPropertyKey> propertyKeys) { |
| IndexBuilder indexBuilder = management.buildIndex(indexName, Vertex.class); |
| |
| for (AtlasPropertyKey key : propertyKeys) { |
| PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key); |
| indexBuilder.addKey(janusKey, org.janusgraph.core.schema.Parameter.of("mapping", Mapping.TEXT)); |
| } |
| |
| indexBuilder.buildMixedIndex(backingIndex); |
| } |
| |
| @Override |
| public boolean containsPropertyKey(String propertyName) { |
| return management.containsPropertyKey(propertyName); |
| } |
| |
| @Override |
| public void rollback() { |
| management.rollback(); |
| |
| } |
| |
| @Override |
| public void commit() { |
| graph.addMultiProperties(newMultProperties); |
| newMultProperties.clear(); |
| management.commit(); |
| } |
| |
| private static void checkName(String name) { |
| //for some reason, name checking was removed from StandardPropertyKeyMaker.make() |
| //in Janus. For consistency, do the check here. |
| Preconditions.checkArgument(StringUtils.isNotBlank(name), "Need to specify name"); |
| |
| for (char c : RESERVED_CHARS) { |
| Preconditions.checkArgument(name.indexOf(c) < 0, "Name can not contains reserved character %s: %s", c, name); |
| } |
| |
| } |
| |
| @Override |
| public AtlasPropertyKey makePropertyKey(String propertyName, Class propertyClass, AtlasCardinality cardinality) { |
| if (cardinality.isMany()) { |
| newMultProperties.add(propertyName); |
| } |
| |
| PropertyKeyMaker propertyKeyBuilder = management.makePropertyKey(propertyName).dataType(propertyClass); |
| if (cardinality != null) { |
| Cardinality janusCardinality = AtlasJanusObjectFactory.createCardinality(cardinality); |
| propertyKeyBuilder.cardinality(janusCardinality); |
| } |
| |
| PropertyKey propertyKey = propertyKeyBuilder.make(); |
| |
| return GraphDbObjectFactory.createPropertyKey(propertyKey); |
| } |
| |
| @Override |
| public AtlasEdgeLabel makeEdgeLabel(String label) { |
| EdgeLabel edgeLabel = management.makeEdgeLabel(label).make(); |
| |
| return GraphDbObjectFactory.createEdgeLabel(edgeLabel); |
| } |
| |
| @Override |
| public void deletePropertyKey(String propertyKey) { |
| PropertyKey janusPropertyKey = management.getPropertyKey(propertyKey); |
| |
| if (null == janusPropertyKey) return; |
| |
| for (int i = 0;; i++) { |
| String deletedKeyName = janusPropertyKey + "_deleted_" + i; |
| |
| if (null == management.getPropertyKey(deletedKeyName)) { |
| management.changeName(janusPropertyKey, deletedKeyName); |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public AtlasPropertyKey getPropertyKey(String propertyName) { |
| checkName(propertyName); |
| |
| return GraphDbObjectFactory.createPropertyKey(management.getPropertyKey(propertyName)); |
| } |
| |
| @Override |
| public AtlasEdgeLabel getEdgeLabel(String label) { |
| return GraphDbObjectFactory.createEdgeLabel(management.getEdgeLabel(label)); |
| } |
| |
| @Override |
| public String addMixedIndex(String indexName, AtlasPropertyKey propertyKey, boolean isStringField) { |
| PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(propertyKey); |
| JanusGraphIndex janusGraphIndex = management.getGraphIndex(indexName); |
| |
| if(isStringField) { |
| management.addIndexKey(janusGraphIndex, janusKey, Mapping.STRING.asParameter()); |
| LOG.debug("created a string type for {} with janueKey {}.", propertyKey.getName(), janusKey); |
| } else { |
| management.addIndexKey(janusGraphIndex, janusKey); |
| LOG.debug("created a default type for {} with janueKey {}.", propertyKey.getName(), janusKey); |
| } |
| |
| String encodedName = ""; |
| if(isStringField) { |
| encodedName = graph.getIndexFieldName(propertyKey, janusGraphIndex, STRING_PARAMETER_ARRAY); |
| } else { |
| encodedName = graph.getIndexFieldName(propertyKey, janusGraphIndex); |
| } |
| |
| |
| LOG.info("property '{}' is encoded to '{}'.", propertyKey.getName(), encodedName); |
| |
| return encodedName; |
| } |
| |
| @Override |
| public String getIndexFieldName(String indexName, AtlasPropertyKey propertyKey, boolean isStringField) { |
| JanusGraphIndex janusGraphIndex = management.getGraphIndex(indexName); |
| |
| if(isStringField) { |
| return graph.getIndexFieldName(propertyKey, janusGraphIndex, STRING_PARAMETER_ARRAY); |
| } else { |
| return graph.getIndexFieldName(propertyKey, janusGraphIndex); |
| } |
| |
| } |
| |
| public AtlasGraphIndex getGraphIndex(String indexName) { |
| JanusGraphIndex index = management.getGraphIndex(indexName); |
| |
| return GraphDbObjectFactory.createGraphIndex(index); |
| } |
| |
| @Override |
| public boolean edgeIndexExist(String label, String indexName) { |
| EdgeLabel edgeLabel = management.getEdgeLabel(label); |
| |
| return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null; |
| } |
| |
| @Override |
| public void createVertexCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) { |
| createCompositeIndex(propertyName, isUnique, propertyKeys, Vertex.class); |
| } |
| |
| @Override |
| public void createEdgeCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) { |
| createCompositeIndex(propertyName, isUnique, propertyKeys, Edge.class); |
| } |
| |
| private void createCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys, Class<? extends Element> elementType) { |
| IndexBuilder indexBuilder = management.buildIndex(propertyName, elementType); |
| |
| for (AtlasPropertyKey key : propertyKeys) { |
| PropertyKey janusKey = AtlasJanusObjectFactory.createPropertyKey(key); |
| indexBuilder.addKey(janusKey); |
| } |
| |
| if (isUnique) { |
| indexBuilder.unique(); |
| } |
| |
| JanusGraphIndex index = indexBuilder.buildCompositeIndex(); |
| |
| if (lockEnabled && isUnique) { |
| management.setConsistency(index, ConsistencyModifier.LOCK); |
| } |
| } |
| |
| @Override |
| public void updateUniqueIndexesForConsistencyLock() { |
| try { |
| setConsistency(this.management, Vertex.class); |
| setConsistency(this.management, Edge.class); |
| } finally { |
| commit(); |
| } |
| } |
| |
| private static void setConsistency(JanusGraphManagement mgmt, Class<? extends Element> elementType) { |
| LOG.info("setConsistency: {}: Starting...", elementType.getSimpleName()); |
| int count = 0; |
| |
| try { |
| Iterable<JanusGraphIndex> iterable = mgmt.getGraphIndexes(elementType); |
| for (JanusGraphIndex index : iterable) { |
| if (!index.isCompositeIndex() || !index.isUnique() || mgmt.getConsistency(index) == ConsistencyModifier.LOCK) { |
| continue; |
| } |
| |
| for (PropertyKey propertyKey : index.getFieldKeys()) { |
| LOG.info("setConsistency: {}: {}", count, propertyKey.name()); |
| } |
| |
| mgmt.setConsistency(index, ConsistencyModifier.LOCK); |
| count++; |
| } |
| } |
| finally { |
| LOG.info("setConsistency: {}: {}: Done!", elementType.getSimpleName(), count); |
| } |
| } |
| |
| @Override |
| public void reindex(String indexName, List<AtlasElement> elements) throws Exception { |
| try { |
| JanusGraphIndex index = management.getGraphIndex(indexName); |
| if (index == null || !(management instanceof ManagementSystem) || !(graph.getGraph() instanceof StandardJanusGraph)) { |
| LOG.error("Could not retrieve index for name: {} ", indexName); |
| return; |
| } |
| |
| ManagementSystem managementSystem = (ManagementSystem) management; |
| IndexType indexType = managementSystem.getSchemaVertex(index).asIndexType(); |
| if (!(indexType instanceof MixedIndexType)) { |
| LOG.warn("Index: {}: Not of MixedIndexType ", indexName); |
| return; |
| } |
| |
| IndexSerializer indexSerializer = ((StandardJanusGraph) graph.getGraph()).getIndexSerializer(); |
| reindexElement(managementSystem, indexSerializer, (MixedIndexType) indexType, elements); |
| } catch (Exception exception) { |
| throw exception; |
| } finally { |
| management.commit(); |
| } |
| } |
| |
| @Override |
| public Object startIndexRecovery(long recoveryStartTime) { |
| Instant recoveryStartInstant = Instant.ofEpochMilli(recoveryStartTime); |
| JanusGraph janusGraph = this.graph.getGraph(); |
| |
| return JanusGraphFactory.startTransactionRecovery(janusGraph, recoveryStartInstant); |
| } |
| |
| @Override |
| public void stopIndexRecovery(Object txRecoveryObject) { |
| if (txRecoveryObject == null) { |
| return; |
| } |
| |
| try { |
| if (txRecoveryObject instanceof TransactionRecovery) { |
| TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject; |
| StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph(); |
| |
| LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!"); |
| |
| janusGraph.getBackend().getSystemTxLog().close(); |
| |
| txRecovery.shutdown(); |
| } else { |
| LOG.error("stopIndexRecovery({}): Invalid transaction recovery object!", txRecoveryObject); |
| } |
| } catch (Exception e) { |
| LOG.warn("stopIndexRecovery: Error while shutting down transaction recovery", e); |
| } |
| } |
| |
| @Override |
| public void printIndexRecoveryStats(Object txRecoveryObject) { |
| if (txRecoveryObject == null) { |
| return; |
| } |
| |
| try { |
| if (txRecoveryObject instanceof TransactionRecovery) { |
| StandardTransactionLogProcessor txRecovery = (StandardTransactionLogProcessor) txRecoveryObject; |
| long[] statistics = txRecovery.getStatistics(); |
| |
| if (statistics.length >= 2) { |
| LOG.info("Index Recovery: Stats: Success:{}: Failed: {}", statistics[0], statistics[1]); |
| } else { |
| LOG.info("Index Recovery: Stats: {}", statistics); |
| } |
| } else { |
| LOG.error("Transaction stats: Invalid transaction recovery object!: Unexpected type: {}: Details: {}", txRecoveryObject.getClass().toString(), txRecoveryObject); |
| } |
| } catch (Exception e) { |
| LOG.error("Error: Retrieving log transaction stats!", e); |
| } |
| } |
| |
| private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception { |
| Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>(); |
| StandardJanusGraphTx tx = managementSystem.getWrappedTx(); |
| BackendTransaction txHandle = tx.getTxHandle(); |
| |
| try { |
| JanusGraphElement janusGraphElement = null; |
| for (AtlasElement element : elements) { |
| try { |
| if (element == null || element.getWrappedElement() == null) { |
| continue; |
| } |
| |
| janusGraphElement = element.getWrappedElement(); |
| indexSerializer.reindexElement(janusGraphElement, indexType, documentsPerStore); |
| } catch (Exception e) { |
| LOG.warn("{}: Exception: {}:{}", indexType.getName(), e.getClass().getSimpleName(), e.getMessage()); |
| } |
| } |
| } finally { |
| if (txHandle != null) { |
| txHandle.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore); |
| } |
| } |
| } |
| } |