blob: c4de0ed276fb77a914c526c44e178b1860c24999 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@Component
public class AuditsWriter {
private static final Logger LOG = LoggerFactory.getLogger(AuditsWriter.class);
private static final String CLUSTER_NAME_DEFAULT = "default";
private static final String DC_SERVER_NAME_SEPARATOR = "$";
private AtlasTypeRegistry typeRegistry;
private AtlasEntityStore entityStore;
private AtlasServerService atlasServerService;
private ExportImportAuditService auditService;
private ExportAudits auditForExport = new ExportAudits();
private ImportAudits auditForImport = new ImportAudits();
@Inject
public AuditsWriter(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasServerService atlasServerService, ExportImportAuditService auditService) {
this.typeRegistry = typeRegistry;
this.entityStore = entityStore;
this.atlasServerService = atlasServerService;
this.auditService = auditService;
}
public AtlasServerService getAtlasServerService() {
return atlasServerService;
}
public void write(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityCreationOrder) throws AtlasBaseException {
auditForExport.add(userName, result, startTime, endTime, entityCreationOrder);
}
public void write(String userName, AtlasImportResult result,
long startTime, long endTime,
List<String> entityCreationOrder) throws AtlasBaseException {
auditForImport.add(userName, result, startTime, endTime, entityCreationOrder);
}
public void write(String userName, String sourceCluster,
long startTime, long endTime,
Set<String> entityCreationOrder) throws AtlasBaseException {
auditForImport.add(userName, sourceCluster, startTime, endTime, entityCreationOrder);
}
private void updateReplicationAttribute(boolean isReplicationSet,
String serverName, String serverFullName,
List<String> exportedGuids,
String attrNameReplicated,
long lastModifiedTimestamp) throws AtlasBaseException {
if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) {
return;
}
String candidateGuid = exportedGuids.get(0);
String replGuidKey = ReplKeyGuidFinder.get(typeRegistry, entityStore, candidateGuid);
AtlasServer server = saveServer(serverName, serverFullName, replGuidKey, lastModifiedTimestamp);
atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated);
}
private AtlasServer saveServer(String clusterName, String serverFullName,
String entityGuid,
long lastModifiedTimestamp) throws AtlasBaseException {
AtlasServer server = atlasServerService.getCreateAtlasServer(clusterName, serverFullName);
server.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp);
if (LOG.isDebugEnabled()) {
LOG.debug("saveServer: {}", server);
}
return atlasServerService.save(server);
}
public static String getCurrentClusterName() {
String ret = StringUtils.EMPTY;
try {
ret = ApplicationProperties.get().getString(AtlasConstants.METADATA_NAMESPACE_KEY, StringUtils.EMPTY);
if (StringUtils.isEmpty(ret)) {
ret = ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, CLUSTER_NAME_DEFAULT);
}
} catch (AtlasException e) {
LOG.error("getCurrentClusterName", e);
}
return ret;
}
static String getServerNameFromFullName(String fullName) {
if (StringUtils.isEmpty(fullName) || !fullName.contains(DC_SERVER_NAME_SEPARATOR)) {
return fullName;
}
String[] splits = StringUtils.split(fullName, DC_SERVER_NAME_SEPARATOR);
if (splits == null || splits.length < 1) {
return "";
} else if (splits.length >= 2) {
return splits[1];
} else {
return splits[0];
}
}
private void saveCurrentServer() throws AtlasBaseException {
atlasServerService.getCreateAtlasServer(getCurrentClusterName(), getCurrentClusterName());
}
static class ReplKeyGuidFinder {
private static final String ENTITY_TYPE_HIVE_DB = "hive_db";
private static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
private static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
public static String get(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String candidateGuid) {
String guid = null;
try {
guid = getParentEntityGuid(typeRegistry, entityStore, candidateGuid);
} catch (AtlasBaseException e) {
LOG.error("Error fetching parent guid for child entity: {}", candidateGuid);
}
if (StringUtils.isEmpty(guid)) {
guid = candidateGuid;
}
return guid;
}
private static String getParentEntityGuid(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String defaultGuid) throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo extInfo = entityStore.getById(defaultGuid);
if (extInfo == null || extInfo.getEntity() == null) {
return null;
}
String typeName = extInfo.getEntity().getTypeName();
if (!typeName.equals(ENTITY_TYPE_HIVE_TABLE) && !typeName.equals(ENTITY_TYPE_HIVE_COLUMN)) {
return null;
}
String hiveDBQualifiedName = extractHiveDBQualifiedName((String) extInfo.getEntity().getAttribute(EntityGraphRetriever.QUALIFIED_NAME));
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(ENTITY_TYPE_HIVE_DB);
return entityStore.getGuidByUniqueAttributes(entityType, Collections.singletonMap(EntityGraphRetriever.QUALIFIED_NAME, hiveDBQualifiedName));
}
@VisibleForTesting
static String extractHiveDBQualifiedName(String qualifiedName) {
return String.format("%s@%s",
StringUtils.substringBefore(qualifiedName, "."),
StringUtils.substringAfter(qualifiedName, "@"));
}
}
private class ExportAudits {
private AtlasExportRequest request;
private String targetServerName;
private boolean replicationOptionState;
private String targetServerFullName;
public void add(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
request = result.getRequest();
replicationOptionState = request.isReplicationOptionSet();
saveCurrentServer();
targetServerFullName = request.getOptionKeyReplicatedTo();
targetServerName = getServerNameFromFullName(targetServerFullName);
auditService.add(userName, getCurrentClusterName(), targetServerName,
ExportImportAuditEntry.OPERATION_EXPORT,
AtlasType.toJson(result), startTime, endTime, !entityGuids.isEmpty());
if (result.getOperationStatus() == AtlasExportResult.OperationStatus.FAIL) {
return;
}
updateReplicationAttribute(replicationOptionState, targetServerName, targetServerFullName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker());
}
}
private class ImportAudits {
private AtlasImportRequest request;
private boolean replicationOptionState;
private String sourceServerName;
private String sourceServerFullName;
public void add(String userName, AtlasImportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
request = result.getRequest();
replicationOptionState = request.isReplicationOptionSet();
saveCurrentServer();
sourceServerFullName = request.getOptionKeyReplicatedFrom();
sourceServerName = getServerNameFromFullName(sourceServerFullName);
auditService.add(userName,
sourceServerName, getCurrentClusterName(),
ExportImportAuditEntry.OPERATION_IMPORT,
AtlasType.toJson(result), startTime, endTime, !entityGuids.isEmpty());
if(result.getOperationStatus() == AtlasImportResult.OperationStatus.FAIL) {
return;
}
updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
}
public void add(String userName, String sourceCluster, long startTime,
long endTime, Set<String> entityGuids) throws AtlasBaseException {
sourceServerName = getServerNameFromFullName(sourceCluster);
auditService.add(userName,
sourceServerName, getCurrentClusterName(),
ExportImportAuditEntry.OPERATION_IMPORT_DELETE_REPL,
AtlasType.toJson(entityGuids), startTime, endTime, !entityGuids.isEmpty());
}
}
}