blob: 9764f8f2da3486f75f0012c71babf3ac6f673d5f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.ParamChecker;
import org.apache.atlas.PropertiesUtil;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.query.Expressions;
import org.apache.atlas.query.GremlinQueryResult;
import org.apache.atlas.query.HiveLineageQuery;
import org.apache.atlas.query.HiveWhereUsedQuery;
import org.apache.atlas.repository.EntityNotFoundException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.collection.immutable.List;
import javax.inject.Inject;
import javax.inject.Singleton;
/**
* Hive implementation of Lineage service interface.
*/
@Singleton
public class HiveLineageService implements LineageService {
private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class);
private static final Option<List<String>> SELECT_ATTRIBUTES =
Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
private static final String HIVE_TABLE_TYPE_NAME;
private static final String HIVE_PROCESS_TYPE_NAME;
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
private static final String HIVE_TABLE_SCHEMA_QUERY;
private static final String HIVE_TABLE_EXISTS_QUERY;
static {
// todo - externalize this using type system - dog food
try {
PropertiesConfiguration conf = PropertiesUtil.getApplicationProperties();
HIVE_TABLE_TYPE_NAME = conf.getString("atlas.lineage.hive.table.type.name", "DataSet");
HIVE_PROCESS_TYPE_NAME = conf.getString("atlas.lineage.hive.process.type.name", "Process");
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = conf.getString("atlas.lineage.hive.process.inputs.name", "inputs");
HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = conf.getString("atlas.lineage.hive.process.outputs.name", "outputs");
HIVE_TABLE_SCHEMA_QUERY =
conf.getString("atlas.lineage.hive.table.schema.query", "hive_table where name=\"%s\", columns");
HIVE_TABLE_EXISTS_QUERY = conf.getString("atlas.lineage.hive.table.exists.query",
"from " + HIVE_TABLE_TYPE_NAME + " where name=\"%s\"");
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
private final TitanGraph titanGraph;
private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
private final GraphBackedDiscoveryService discoveryService;
@Inject
HiveLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
this.titanGraph = graphProvider.get();
this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
this.discoveryService = discoveryService;
}
/**
* Return the lineage outputs for the given tableName.
*
* @param tableName tableName
* @return Lineage Outputs as JSON
*/
@Override
@GraphTransaction
public String getOutputs(String tableName) throws AtlasException {
LOG.info("Fetching lineage outputs for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveWhereUsedQuery outputsQuery =
new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = outputsQuery.expr();
LOG.debug("Expression is [" + expression.toString() + "]");
try {
return discoveryService.evaluate(expression).toJson();
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
}
}
/**
* Return the lineage outputs graph for the given tableName.
*
* @param tableName tableName
* @return Outputs Graph as JSON
*/
@Override
@GraphTransaction
public String getOutputsGraph(String tableName) throws AtlasException {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveWhereUsedQuery outputsQuery =
new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
return outputsQuery.graph().toInstanceJson();
}
/**
* Return the lineage inputs for the given tableName.
*
* @param tableName tableName
* @return Lineage Inputs as JSON
*/
@Override
@GraphTransaction
public String getInputs(String tableName) throws AtlasException {
LOG.info("Fetching lineage inputs for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = inputsQuery.expr();
LOG.debug("Expression is [" + expression.toString() + "]");
try {
return discoveryService.evaluate(expression).toJson();
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
}
}
/**
* Return the lineage inputs graph for the given tableName.
*
* @param tableName tableName
* @return Inputs Graph as JSON
*/
@Override
@GraphTransaction
public String getInputsGraph(String tableName) throws AtlasException {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
return inputsQuery.graph().toInstanceJson();
}
/**
* Return the schema for the given tableName.
*
* @param tableName tableName
* @return Schema as JSON
*/
@Override
@GraphTransaction
public String getSchema(String tableName) throws AtlasException {
LOG.info("Fetching schema for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null");
validateTableExists(tableName);
final String schemaQuery = String.format(HIVE_TABLE_SCHEMA_QUERY, tableName);
return discoveryService.searchByDSL(schemaQuery);
}
/**
* Validate if indeed this is a table type and exists.
*
* @param tableName table name
*/
private void validateTableExists(String tableName) throws AtlasException {
final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName);
GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
if (!(queryResult.rows().length() > 0)) {
throw new EntityNotFoundException(tableName + " does not exist");
}
}
}