blob: 5787c9365ace00687008c1e023b2f0ff7a09be96 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook.events;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class CreateHiveProcess extends BaseHiveEvent {
private static final Logger LOG = LoggerFactory.getLogger(CreateHiveProcess.class);
public CreateHiveProcess(AtlasHiveHookContext context) {
super(context);
}
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = getEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
}
return ret;
}
public AtlasEntitiesWithExtInfo getEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = null;
if (!skipProcess()) {
List<AtlasEntity> inputs = new ArrayList<>();
List<AtlasEntity> outputs = new ArrayList<>();
Set<String> processedNames = new HashSet<>();
ret = new AtlasEntitiesWithExtInfo();
if (getInputs() != null) {
for (ReadEntity input : getInputs()) {
String qualifiedName = getQualifiedName(input);
if (qualifiedName == null || !processedNames.add(qualifiedName)) {
continue;
}
AtlasEntity entity = getInputOutputEntity(input, ret, skipTempTables);
if (!input.isDirect()) {
continue;
}
if (entity != null) {
inputs.add(entity);
}
}
}
if (getOutputs() != null) {
for (WriteEntity output : getOutputs()) {
String qualifiedName = getQualifiedName(output);
if (qualifiedName == null || !processedNames.add(qualifiedName)) {
continue;
}
AtlasEntity entity = getInputOutputEntity(output, ret, skipTempTables);
if (entity != null) {
outputs.add(entity);
}
if (isDdlOperation(entity)) {
AtlasEntity ddlEntity = createHiveDDLEntity(entity);
if (ddlEntity != null) {
ret.addEntity(ddlEntity);
}
}
}
}
boolean skipProcess = inputs.isEmpty() && outputs.isEmpty();
if (!skipProcess) {
if (inputs.isEmpty() && context.isSkippedInputEntity()) {
skipProcess = true;
} else if (outputs.isEmpty() && context.isSkippedOutputEntity()) {
skipProcess = true;
}
}
if (!skipProcess && !context.isMetastoreHook()) {
AtlasEntity process = getHiveProcessEntity(inputs, outputs);
ret.addEntity(process);
AtlasEntity processExecution = getHiveProcessExecutionEntity(process);
ret.addEntity(processExecution);
processColumnLineage(process, ret);
addProcessedEntities(ret);
} else {
ret = null;
}
}
return ret;
}
private void processColumnLineage(AtlasEntity hiveProcess, AtlasEntitiesWithExtInfo entities) {
LineageInfo lineageInfo = getLineageInfo();
if (lineageInfo == null || CollectionUtils.isEmpty(lineageInfo.entrySet())) {
return;
}
final List<AtlasEntity> columnLineages = new ArrayList<>();
int lineageInputsCount = 0;
final Set<String> processedOutputCols = new HashSet<>();
for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) {
String outputColName = getQualifiedName(entry.getKey());
AtlasEntity outputColumn = context.getEntity(outputColName);
if (LOG.isDebugEnabled()) {
LOG.debug("processColumnLineage(): DependencyKey={}; Dependency={}", entry.getKey(), entry.getValue());
}
if (outputColumn == null) {
LOG.warn("column-lineage: non-existing output-column {}", outputColName);
continue;
}
if (processedOutputCols.contains(outputColName)) {
LOG.warn("column-lineage: duplicate for output-column {}", outputColName);
continue;
} else {
processedOutputCols.add(outputColName);
}
List<AtlasEntity> inputColumns = new ArrayList<>();
for (BaseColumnInfo baseColumn : getBaseCols(entry.getValue())) {
String inputColName = getQualifiedName(baseColumn);
AtlasEntity inputColumn = context.getEntity(inputColName);
if (inputColumn == null) {
LOG.warn("column-lineage: non-existing input-column {} for output-column={}", inputColName, outputColName);
continue;
}
inputColumns.add(inputColumn);
}
if (inputColumns.isEmpty()) {
continue;
}
lineageInputsCount += inputColumns.size();
AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, AtlasTypeUtil.getAtlasRelatedObjectId(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType());
columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());
columnLineages.add(columnLineageProcess);
}
float avgInputsCount = columnLineages.size() > 0 ? (((float) lineageInputsCount) / columnLineages.size()) : 0;
boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && avgInputsCount > context.getSkipHiveColumnLineageHive20633InputsThreshold();
if (!skipColumnLineage) {
for (AtlasEntity columnLineage : columnLineages) {
entities.addEntity(columnLineage);
}
} else {
LOG.warn("skipped {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}", columnLineages.size(), avgInputsCount, context.getSkipHiveColumnLineageHive20633InputsThreshold(), lineageInputsCount);
}
}
private Collection<BaseColumnInfo> getBaseCols(Dependency lInfoDep) {
Collection<BaseColumnInfo> ret = Collections.emptyList();
if (lInfoDep != null) {
try {
Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols");
Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep);
if (retGetBaseCols != null) {
if (retGetBaseCols instanceof Collection) {
ret = (Collection) retGetBaseCols;
} else {
LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}",
retGetBaseCols.getClass().getName(), "Collection");
}
}
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
LOG.warn("getBaseCols()", ex);
}
}
return ret;
}
private boolean skipProcess() {
Set<ReadEntity> inputs = getInputs();
Set<WriteEntity> outputs = getOutputs();
boolean ret = CollectionUtils.isEmpty(inputs) && CollectionUtils.isEmpty(outputs);
if (!ret) {
if (getContext().getHiveOperation() == HiveOperation.QUERY) {
// Select query has only one output
if (outputs.size() == 1) {
WriteEntity output = outputs.iterator().next();
if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR) {
if (output.getWriteType() == WriteEntity.WriteType.PATH_WRITE && output.isTempURI()) {
ret = true;
}
}
// DELETE and UPDATE initially have one input and one output.
// Since they do not support sub-query, they won't create a lineage that have one input and one output. (One input only)
// It's safe to filter them out here.
if (output.getWriteType() == WriteEntity.WriteType.DELETE || output.getWriteType() == WriteEntity.WriteType.UPDATE) {
ret = true;
}
}
}
}
return ret;
}
private boolean isDdlOperation(AtlasEntity entity) {
return entity != null && !context.isMetastoreHook()
&& (context.getHiveOperation().equals(HiveOperation.CREATETABLE_AS_SELECT)
|| context.getHiveOperation().equals(HiveOperation.CREATEVIEW)
|| context.getHiveOperation().equals(HiveOperation.ALTERVIEW_AS)
|| context.getHiveOperation().equals(HiveOperation.CREATE_MATERIALIZED_VIEW));
}
}