blob: 0d47e38fdf5c59af744fdad52adaa523de8ef93c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.StandardIndexProvider;
import org.janusgraph.diskstorage.StandardStoreManager;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF;
/**
* Default implementation for Graph Provider that doles out JanusGraph.
*/
public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
private static final String OLDER_STORAGE_EXCEPTION = "Storage version is incompatible with current client";
/**
* Constant for the configuration property that indicates the prefix.
*/
public static final String GRAPH_PREFIX = "atlas.graph";
public static final String INDEX_BACKEND_CONF = "index.search.backend";
public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url";
public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls";
public static final String INDEX_BACKEND_LUCENE = "lucene";
public static final String INDEX_BACKEND_ES = "elasticsearch";
public static final String GRAPH_TX_LOG_CONF = "tx.log-tx";
public static final String GRAPH_TX_LOG_VERBOSE_CONF = "tx.recovery.verbose";
public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours";
public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl";
public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs
private static volatile AtlasJanusGraph atlasGraphInstance = null;
private static volatile JanusGraph graphInstance;
public AtlasJanusGraphDatabase() {
//update registry
GraphSONMapper.build().addRegistry(JanusGraphIoRegistry.getInstance()).create();
}
public static Configuration getConfiguration() throws AtlasException {
Configuration configProperties = ApplicationProperties.get();
if (isEmbeddedSolr()) { // AtlasJanusGraphIndexClient.performRequestHandlerAction() fails for embedded-solr; disable freetext until this issue is resolved
startEmbeddedSolr();
configProperties.setProperty(ApplicationProperties.ENABLE_FREETEXT_SEARCH_CONF, false);
}
configProperties.setProperty(SOLR_ZOOKEEPER_URLS, configProperties.getStringArray(SOLR_ZOOKEEPER_URL));
Configuration janusConfig = ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
//add serializers for non-standard property value types that Atlas uses
janusConfig.addProperty("attributes.custom.attribute1.attribute-class", TypeCategory.class.getName());
janusConfig.addProperty("attributes.custom.attribute1.serializer-class", TypeCategorySerializer.class.getName());
//not ideal, but avoids making large changes to Atlas
janusConfig.addProperty("attributes.custom.attribute2.attribute-class", ArrayList.class.getName());
janusConfig.addProperty("attributes.custom.attribute2.serializer-class", SerializableSerializer.class.getName());
janusConfig.addProperty("attributes.custom.attribute3.attribute-class", BigInteger.class.getName());
janusConfig.addProperty("attributes.custom.attribute3.serializer-class", BigIntegerSerializer.class.getName());
janusConfig.addProperty("attributes.custom.attribute4.attribute-class", BigDecimal.class.getName());
janusConfig.addProperty("attributes.custom.attribute4.serializer-class", BigDecimalSerializer.class.getName());
return janusConfig;
}
static {
addHBase2Support();
addSolr6Index();
}
private static void addHBase2Support() {
try {
Field field = StandardStoreManager.class.getDeclaredField("ALL_MANAGER_CLASSES");
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
Map<String, String> customMap = new HashMap<>(StandardStoreManager.getAllManagerClasses());
customMap.put("hbase2", org.janusgraph.diskstorage.hbase2.HBaseStoreManager.class.getName());
ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
field.set(null, immap);
LOG.debug("Injected HBase2 support - {}", org.janusgraph.diskstorage.hbase2.HBaseStoreManager.class.getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void addSolr6Index() {
try {
Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses());
customMap.put("solr", Solr6Index.class.getName());
ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
field.set(null, immap);
LOG.debug("Injected solr6 index - {}", Solr6Index.class.getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static JanusGraph getGraphInstance() {
if (graphInstance == null) {
synchronized (AtlasJanusGraphDatabase.class) {
if (graphInstance == null) {
Configuration config;
try {
config = getConfiguration();
} catch (AtlasException e) {
throw new RuntimeException(e);
}
configureTxLogBasedIndexRecovery();
graphInstance = initJanusGraph(config);
atlasGraphInstance = new AtlasJanusGraph();
validateIndexBackend(config);
}
}
}
return graphInstance;
}
@VisibleForTesting
static JanusGraph initJanusGraph(Configuration config) {
try {
return JanusGraphFactory.open(config);
} catch (JanusGraphException e) {
LOG.warn("JanusGraphException: {}", e.getMessage());
if (e.getMessage().startsWith(OLDER_STORAGE_EXCEPTION)) {
LOG.info("Newer client is being used with older janus storage version. Setting allow-upgrade=true and reattempting connection");
config.addProperty("graph.allow-upgrade", true);
return JanusGraphFactory.open(config);
} else {
throw new RuntimeException(e);
}
}
}
public static void configureTxLogBasedIndexRecovery() {
try {
boolean recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
long ttl = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL);
Duration txLogTtlSecs = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
Map<String, Object> properties = new HashMap<String, Object>() {{
put(GRAPH_TX_LOG_CONF, recoveryEnabled);
put(GRAPH_TX_LOG_VERBOSE_CONF, recoveryEnabled);
put(GRAPH_TX_LOG_TTL_CONF, txLogTtlSecs);
}};
updateGlobalConfiguration(properties);
LOG.info("Tx Log-based Index Recovery: {}!", recoveryEnabled ? "Enabled" : "Disabled");
} catch (Exception e) {
LOG.error("Error: Failed!", e);
}
}
private static void updateGlobalConfiguration(Map<String, Object> map) {
JanusGraph graph = null;
JanusGraphManagement managementSystem = null;
try {
graph = initJanusGraph(getConfiguration());
managementSystem = graph.openManagement();
for (Map.Entry<String, Object> entry : map.entrySet()) {
managementSystem.set(entry.getKey(), entry.getValue());
}
LOG.info("Global properties updated!: {}", map);
} catch (Exception ex) {
LOG.error("Error updating global configuration: {}", map, ex);
} finally {
if (managementSystem != null) {
managementSystem.commit();
}
if (graph != null) {
graph.close();
}
}
}
public static JanusGraph getBulkLoadingGraphInstance() {
try {
Configuration cfg = getConfiguration();
cfg.setProperty("storage.batch-loading", true);
return JanusGraphFactory.open(cfg);
} catch (IllegalArgumentException ex) {
LOG.error("getBulkLoadingGraphInstance: Failed!", ex);
} catch (AtlasException ex) {
LOG.error("getBulkLoadingGraphInstance: Failed!", ex);
}
return null;
}
public static void unload() {
synchronized (AtlasJanusGraphDatabase.class) {
if (graphInstance == null) {
return;
}
graphInstance.tx().commit();
graphInstance.close();
graphInstance = null;
}
}
static void validateIndexBackend(Configuration config) {
String configuredIndexBackend = config.getString(INDEX_BACKEND_CONF);
JanusGraphManagement managementSystem = getGraphInstance().openManagement();
String currentIndexBackend = managementSystem.get(INDEX_BACKEND_CONF);
managementSystem.commit();
if (!configuredIndexBackend.equals(currentIndexBackend)) {
throw new RuntimeException("Configured Index Backend " + configuredIndexBackend
+ " differs from earlier configured Index Backend " + currentIndexBackend + ". Aborting!");
}
}
@Override
public boolean isGraphLoaded() {
return graphInstance != null;
}
@Override
public void initializeTestGraph() {
//nothing to do
}
@Override
public void cleanup() {
JanusGraph g = getGraphInstance();
try {
if(g != null) {
g.close();
}
} catch (Throwable t) {
LOG.warn("Could not close test JanusGraph", t);
t.printStackTrace();
}
try {
if(g != null) {
JanusGraphFactory.drop(g);
}
} catch (Throwable t) {
LOG.warn("Could not clear test JanusGraph", t);
t.printStackTrace();
}
if (isEmbeddedSolr()) {
try {
stopEmbeddedSolr();
} catch (Throwable t) {
LOG.warn("Could not stop local solr server", t);
}
}
}
@Override
public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraph() {
getGraphInstance();
return atlasGraphInstance;
}
@Override
public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraphBulkLoading() {
return new AtlasJanusGraph(getBulkLoadingGraphInstance());
}
private static void startEmbeddedSolr() throws AtlasException {
LOG.info("==> startEmbeddedSolr()");
try {
Class<?> localSolrRunnerClz = Class.forName("org.apache.atlas.runner.LocalSolrRunner");
Method startMethod = localSolrRunnerClz.getMethod("start");
startMethod.invoke(null);
} catch (Exception excp) {
LOG.error("startEmbeddedSolr(): failed", excp);
throw new AtlasException("startEmbeddedSolr(): failed", excp);
}
LOG.info("<== startEmbeddedSolr()");
}
private static void stopEmbeddedSolr() throws AtlasException {
LOG.info("==> stopEmbeddedSolr()");
try {
Class<?> localSolrRunnerClz = Class.forName("org.apache.atlas.runner.LocalSolrRunner");
Method stopMethod = localSolrRunnerClz.getMethod("stop");
stopMethod.invoke(null);
} catch (Exception excp) {
LOG.error("stopEmbeddedSolr(): failed", excp);
throw new AtlasException("stopEmbeddedSolr(): failed", excp);
}
LOG.info("<== stopEmbeddedSolr()");
}
public static boolean isEmbeddedSolr() {
boolean ret = false;
try {
Configuration conf = ApplicationProperties.get();
Object property = conf.getProperty("atlas.graph.index.search.solr.embedded");
if (property != null && property instanceof String) {
ret = Boolean.valueOf((String) property);
}
} catch (AtlasException ignored) { }
return ret;
}
}