blob: 91611de88eb7688df9f270bcc991c613746455ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.hook.events;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.apache.hadoop.hive.ql.plan.HiveOperation.*;
public class CreateTable extends BaseHiveEvent {
private static final Logger LOG = LoggerFactory.getLogger(CreateTable.class);
public CreateTable(AtlasHiveHookContext context) {
super(context);
}
@Override
public List<HookNotification> getNotificationMessages() throws Exception {
List<HookNotification> ret = null;
AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities();
if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) {
ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities));
}
return ret;
}
public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
ListenerEvent event = context.getMetastoreEvent();
HiveOperation oper = context.getHiveOperation();
Table table;
if (isAlterTable(oper)) {
table = toTable(((AlterTableEvent) event).getNewTable());
} else {
table = toTable(((CreateTableEvent) event).getTable());
}
if (skipTemporaryTable(table)) {
table = null;
}
processTable(table, ret);
addProcessedEntities(ret);
return ret;
}
public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception {
AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo();
Table table = null;
if (CollectionUtils.isNotEmpty(getOutputs())) {
for (Entity entity : getOutputs()) {
if (entity.getType() == Entity.Type.TABLE) {
table = entity.getTable();
if (table != null) {
table = getHive().getTable(table.getDbName(), table.getTableName());
if (table != null) {
if (skipTemporaryTable(table)) {
table = null;
} else {
break;
}
}
}
}
}
}
processTable(table, ret);
addProcessedEntities(ret);
return ret;
}
// create process entities for lineages from HBase/HDFS to hive table
private void processTable(Table table, AtlasEntitiesWithExtInfo ret) throws Exception {
if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret);
if (tblEntity != null) {
if (isHBaseStore(table)) {
if (context.isMetastoreHook()) {
//do nothing
} else {
// This create lineage to HBase table in case of Hive on HBase
AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
//not a hive metastore hook
//it is running in the context of Hbase.
if (hbaseTableEntity != null) {
final AtlasEntity processEntity;
if (EXTERNAL_TABLE.equals(table.getTableType())) {
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
}
ret.addEntity(processEntity);
AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
ret.addEntity(processExecution);
}
}
} else {
if (context.isMetastoreHook()) {
//it is running in the context of HiveMetastore
//not a hive metastore hook
if (isCreateExtTableOperation(table)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating a dummy process with lineage from hdfs path to hive table");
}
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
}
} else {
//not a hive metastore hook
//it is running in the context of HiveServer2
if (EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret);
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
AtlasEntity processExecution = getHiveProcessExecutionEntity(processEntity);
ret.addEntity(processExecution);
}
}
}
if (!context.isMetastoreHook()) {
AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
if (tableDDLEntity != null) {
ret.addEntity(tableDDLEntity);
}
}
}
}
}
private static boolean isAlterTable(HiveOperation oper) {
return (oper == ALTERTABLE_PROPERTIES || oper == ALTERTABLE_RENAME || oper == ALTERTABLE_RENAMECOL);
}
private boolean skipTemporaryTable(Table table) {
// If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage.
return table != null && skipTempTables && table.isTemporary() && !EXTERNAL_TABLE.equals(table.getTableType());
}
private boolean isCreateExtTableOperation(Table table) {
HiveOperation oper = context.getHiveOperation();
TableType tableType = table.getTableType();
return EXTERNAL_TABLE.equals(tableType) && (oper == CREATETABLE || oper == CREATETABLE_AS_SELECT);
}
}