blob: 225e44a7e9c67b85c74bb0cbdfef189a834742bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.metadata;
import com.thinkaurelius.titan.core.EdgeLabel;
import com.thinkaurelius.titan.core.Order;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.KeyIndexableGraph;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.TransactionRetryHelper;
import com.tinkerpop.blueprints.util.TransactionWork;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Metadata relationship mapping service. Maps relationships into a graph database.
*/
public class MetadataMappingService
implements FalconService, ConfigurationChangeListener, WorkflowExecutionListener {
private static final Logger LOG = LoggerFactory.getLogger(MetadataMappingService.class);
/**
* Constance for the service name.
*/
public static final String SERVICE_NAME = MetadataMappingService.class.getSimpleName();
/**
* Constant for the configuration property that indicates the prefix.
*/
private static final String FALCON_PREFIX = "falcon.graph.";
/**
* Constant for the configuration property that indicates the storage backend.
*/
public static final String PROPERTY_KEY_STORAGE_BACKEND = "storage.backend";
public static final String STORAGE_BACKEND_HBASE = "hbase";
public static final String STORAGE_BACKEND_BDB = "berkeleyje";
/**
* HBase configuration properties.
*/
public static final String PROPERTY_KEY_STORAGE_HOSTNAME = "storage.hostname";
public static final String PROPERTY_KEY_STORAGE_TABLE = "storage.hbase.table";
public static final Set<String> PROPERTY_KEYS_HBASE = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
PROPERTY_KEY_STORAGE_HOSTNAME, PROPERTY_KEY_STORAGE_TABLE)));
/**
* Berkeley DB configuration properties.
*/
public static final String PROPERTY_KEY_STORAGE_DIRECTORY = "storage.directory";
public static final String PROPERTY_KEY_SERIALIZE_PATH = "serialize.path";
public static final Set<String> PROPERTY_KEYS_BDB = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
PROPERTY_KEY_STORAGE_DIRECTORY, PROPERTY_KEY_SERIALIZE_PATH)));
private Graph graph;
private Set<String> vertexIndexedKeys;
private Set<String> edgeIndexedKeys;
private EntityRelationshipGraphBuilder entityGraphBuilder;
private InstanceRelationshipGraphBuilder instanceGraphBuilder;
private int transactionRetries;
private long transactionRetryDelayInMillis;
@Override
public String getName() {
return SERVICE_NAME;
}
@Override
public void init() throws FalconException {
graph = initializeGraphDB();
createIndicesForVertexKeys();
// todo - create Edge Cardinality Constraints
LOG.info("Initialized graph db: {}", graph);
vertexIndexedKeys = getIndexableGraph().getIndexedKeys(Vertex.class);
LOG.info("Init vertex property keys: {}", vertexIndexedKeys);
edgeIndexedKeys = getIndexableGraph().getIndexedKeys(Edge.class);
LOG.info("Init edge property keys: {}", edgeIndexedKeys);
boolean preserveHistory = Boolean.valueOf(StartupProperties.get().getProperty(
"falcon.graph.preserve.history", "false"));
entityGraphBuilder = new EntityRelationshipGraphBuilder(graph, preserveHistory);
instanceGraphBuilder = new InstanceRelationshipGraphBuilder(graph, preserveHistory);
ConfigurationStore.get().registerListener(this);
Services.get().<WorkflowJobEndNotificationService>getService(
WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
try {
transactionRetries = Integer.parseInt(StartupProperties.get().getProperty(
"falcon.graph.transaction.retry.count", "3"));
transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty(
"falcon.graph.transaction.retry.delay", "5"));
} catch (NumberFormatException e) {
throw new FalconException("Invalid values for graph transaction retry delay/count " + e);
}
}
public static Graph initializeGraphDB() {
LOG.info("Initializing graph db");
Configuration graphConfig = getConfiguration();
validateConfiguration(graphConfig);
return GraphFactory.open(graphConfig);
}
private static void validateConfiguration(Configuration graphConfig) {
// check if storage backend if configured
if (!graphConfig.containsKey(PROPERTY_KEY_STORAGE_BACKEND)) {
throw new FalconRuntimException("Titan GraphDB storage backend is not configured. "
+ "You need to choose either hbase or berkeleydb."
+ "Please check Configuration twiki or "
+ "the section Graph Database Properties in startup.properties "
+ "on how to configure Titan GraphDB backend.");
}
String backend = graphConfig.getString(PROPERTY_KEY_STORAGE_BACKEND);
switch (backend) {
case STORAGE_BACKEND_BDB:
// check required parameter for Berkeley DB backend
for (String key : PROPERTY_KEYS_BDB) {
if (!graphConfig.containsKey(key)) {
throw new FalconRuntimException("Required parameter " + FALCON_PREFIX + key
+ " not found in startup.properties."
+ "Please check Configuration twiki or "
+ "the section Graph Database Properties in startup.properties "
+ "on how to configure Berkeley DB storage backend.");
}
}
break;
case STORAGE_BACKEND_HBASE:
// check required parameter for HBase backend
for (String key : PROPERTY_KEYS_HBASE) {
if (!graphConfig.containsKey(key)) {
throw new FalconRuntimException("Required parameter " + FALCON_PREFIX + key
+ " not found in startup.properties."
+ "Please check Configuration twiki or "
+ "the section Graph Database Properties in startup.properties "
+ "on how to configure HBase storage backend.");
}
}
break;
default:
throw new FalconRuntimException("Invalid graph storage backend: " + backend + ". "
+ "You need to choose either hbase or berkeleydb."
+ "Please check Configuration twiki or "
+ "the section Graph Database Properties in startup.properties "
+ "on how to configure Titan GraphDB backend.");
}
}
public static Configuration getConfiguration() {
Configuration graphConfig = new BaseConfiguration();
Properties configProperties = StartupProperties.get();
for (Map.Entry entry : configProperties.entrySet()) {
String name = (String) entry.getKey();
if (name.startsWith(FALCON_PREFIX)) {
String value = (String) entry.getValue();
name = name.substring(FALCON_PREFIX.length());
graphConfig.setProperty(name, value);
}
}
return graphConfig;
}
/**
* This unfortunately requires a handle to Titan implementation since
* com.tinkerpop.blueprints.KeyIndexableGraph#createKeyIndex does not create an index.
*/
protected void createIndicesForVertexKeys() {
if (!((KeyIndexableGraph) graph).getIndexedKeys(Vertex.class).isEmpty()) {
LOG.info("Indexes already exist for graph");
return;
}
LOG.info("Indexes does not exist, Creating indexes for graph");
// todo - externalize this
makeNameKeyIndex();
makeKeyIndex(RelationshipProperty.TYPE.getName());
makeKeyIndex(RelationshipProperty.TIMESTAMP.getName());
makeKeyIndex(RelationshipProperty.VERSION.getName());
makeInstanceIndex();
}
private void makeInstanceIndex() {
// build index for instance search
TitanManagement titanManagement = getTitanGraph().getManagementSystem();
PropertyKey statusKey = makePropertyKey(titanManagement, RelationshipProperty.STATUS.getName());
PropertyKey nominalTimeKey = makePropertyKey(titanManagement, RelationshipProperty.NOMINAL_TIME.getName());
EdgeLabel edgeLabel = titanManagement.makeEdgeLabel(RelationshipLabel.INSTANCE_ENTITY_EDGE.getName()).make();
titanManagement.buildEdgeIndex(edgeLabel, "indexInstanceN", Direction.OUT, Order.DESC, nominalTimeKey);
titanManagement.buildEdgeIndex(edgeLabel, "indexInstanceSN", Direction.OUT, Order.DESC,
statusKey, nominalTimeKey);
titanManagement.commit();
}
private void makeNameKeyIndex() {
TitanManagement titanManagement = getTitanGraph().getManagementSystem();
PropertyKey nameKey = makePropertyKey(titanManagement, RelationshipProperty.NAME.getName());
titanManagement.buildIndex("indexByVertexName", Vertex.class).addKey(nameKey).buildCompositeIndex();
titanManagement.buildIndex("indexByEdgeName", Edge.class).addKey(nameKey).buildCompositeIndex();
titanManagement.commit();
}
private void makeKeyIndex(String key) {
TitanManagement titanManagement = getTitanGraph().getManagementSystem();
PropertyKey propertyKey = makePropertyKey(titanManagement, key);
titanManagement.buildIndex("indexBy" + key, Vertex.class).addKey(propertyKey).buildCompositeIndex();
titanManagement.commit();
}
private PropertyKey makePropertyKey(TitanManagement titanManagement, String key) {
if (titanManagement.containsPropertyKey(key)) {
return titanManagement.getPropertyKey(key);
}
return titanManagement.makePropertyKey(key).dataType(String.class).make();
}
public Graph getGraph() {
return graph;
}
public KeyIndexableGraph getIndexableGraph() {
return (KeyIndexableGraph) graph;
}
public TransactionalGraph getTransactionalGraph() {
return (TransactionalGraph) graph;
}
public TitanBlueprintsGraph getTitanGraph() {
return (TitanBlueprintsGraph) graph;
}
public Set<String> getVertexIndexedKeys() {
return vertexIndexedKeys;
}
public Set<String> getEdgeIndexedKeys() {
return edgeIndexedKeys;
}
@Override
public void destroy() throws FalconException {
Services.get().<WorkflowJobEndNotificationService>getService(
WorkflowJobEndNotificationService.SERVICE_NAME).unregisterListener(this);
LOG.info("Shutting down graph db");
graph.shutdown();
}
@Override
public void onAdd(final Entity entity) throws FalconException {
EntityType entityType = entity.getEntityType();
LOG.info("Adding lineage for entity: {}, type: {}", entity.getName(), entityType);
try {
new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
.perform(new TransactionWork<Void>() {
@Override
public Void execute(TransactionalGraph transactionalGraph) throws Exception {
entityGraphBuilder.addEntity(entity);
transactionalGraph.commit();
return null;
}
}).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
} catch (Exception e) {
getTransactionalGraph().rollback();
throw new FalconException(e);
}
}
@Override
public void onRemove(Entity entity) throws FalconException {
// do nothing, we'd leave the deleted entities as-is for historical purposes
// should we mark 'em as deleted?
}
@Override
public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException {
EntityType entityType = newEntity.getEntityType();
LOG.info("Updating lineage for entity: {}, type: {}", newEntity.getName(), entityType);
try {
new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
.perform(new TransactionWork<Void>() {
@Override
public Void execute(TransactionalGraph transactionalGraph) throws Exception {
entityGraphBuilder.updateEntity(oldEntity, newEntity);
transactionalGraph.commit();
return null;
}
}).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
} catch (Exception e) {
getTransactionalGraph().rollback();
throw new FalconException(e);
}
}
@Override
public void onReload(Entity entity) throws FalconException {
onAdd(entity);
}
@Override
public void onStart(final WorkflowExecutionContext context) throws FalconException {
LOG.info("onStart {}", context);
onInstanceExecutionUpdate(context);
}
@Override
public void onSuccess(final WorkflowExecutionContext context) throws FalconException {
LOG.info("onSuccess {}", context);
onInstanceExecutionUpdate(context);
}
@Override
public void onFailure(final WorkflowExecutionContext context) throws FalconException {
LOG.info("onFailure {}", context);
onInstanceExecutionUpdate(context);
}
@Override
public void onSuspend(final WorkflowExecutionContext context) throws FalconException {
LOG.info("onSuspend {}", context);
onInstanceExecutionUpdate(context);
}
@Override
public void onWait(final WorkflowExecutionContext context) throws FalconException {
LOG.info("onWait {}", context);
onInstanceExecutionUpdate(context);
}
private void onInstanceExecutionUpdate(final WorkflowExecutionContext context) throws FalconException {
try {
new TransactionRetryHelper.Builder<Void>(getTransactionalGraph())
.perform(new TransactionWork<Void>() {
@Override
public Void execute(TransactionalGraph transactionalGraph) throws Exception {
updateInstanceStatus(context);
transactionalGraph.commit();
return null;
}
}).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis);
} catch (Exception e) {
getTransactionalGraph().rollback();
throw new FalconException(e);
}
}
private void updateInstanceStatus(final WorkflowExecutionContext context) throws FalconException {
if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
// TODO(yzheng): FALCON-1776 Instance update on titan DB based on JMS notifications on coordinator actions
return;
}
WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
switch (entityOperation) {
case GENERATE:
updateProcessInstance(context);
break;
case REPLICATE:
updateReplicatedFeedInstance(context);
break;
case DELETE:
updateEvictedFeedInstance(context);
break;
case IMPORT:
updateImportedFeedInstance(context);
break;
case EXPORT:
updateExportedFeedInstance(context);
break;
default:
throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation);
}
}
private void updateProcessInstance(WorkflowExecutionContext context) throws FalconException {
LOG.info("Updating process instance: {}", context.getNominalTimeAsISO8601());
Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
instanceGraphBuilder.addInputFeedInstances(context, processInstance);
}
private void updateReplicatedFeedInstance(WorkflowExecutionContext context) throws FalconException {
LOG.info("Updating replicated feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addReplicatedInstance(context);
}
private void updateEvictedFeedInstance(WorkflowExecutionContext context) throws FalconException {
LOG.info("Updating evicted feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addEvictedInstance(context);
}
private void updateImportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
LOG.info("Updating imported feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addImportedInstance(context);
}
private void updateExportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
LOG.info("Updating export feed instance: {}", context.getNominalTimeAsISO8601());
instanceGraphBuilder.addExportedInstance(context);
}
}