blob: 5c08874ff1bba5769b7563a3aaf3d49b3ef9970b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.CreateUpdateEntitiesResult;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.instance.GuidMapping;
import org.apache.atlas.model.legacy.EntityResult;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.repository.converters.AtlasFormatConverter.ConverterContext;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Singleton
@Component
public class AtlasInstanceConverter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasInstanceConverter.class);
private final AtlasTypeRegistry typeRegistry;
private final AtlasFormatConverters instanceFormatters;
private final EntityGraphRetriever entityGraphRetriever;
private final EntityGraphRetriever entityGraphRetrieverIgnoreRelationshipAttrs;
@Inject
public AtlasInstanceConverter(AtlasGraph graph, AtlasTypeRegistry typeRegistry, AtlasFormatConverters instanceFormatters) {
this.typeRegistry = typeRegistry;
this.instanceFormatters = instanceFormatters;
this.entityGraphRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.entityGraphRetrieverIgnoreRelationshipAttrs = new EntityGraphRetriever(graph, typeRegistry, true);
}
public Referenceable[] getReferenceables(Collection<AtlasEntity> entities) throws AtlasBaseException {
Referenceable[] ret = new Referenceable[entities.size()];
AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext();
for(Iterator<AtlasEntity> i = entities.iterator(); i.hasNext(); ) {
ctx.addEntity(i.next());
}
Iterator<AtlasEntity> entityIterator = entities.iterator();
for (int i = 0; i < entities.size(); i++) {
ret[i] = getReferenceable(entityIterator.next(), ctx);
}
return ret;
}
public Referenceable getReferenceable(AtlasEntity entity) throws AtlasBaseException {
return getReferenceable(entity, new ConverterContext());
}
public Referenceable getReferenceable(String guid) throws AtlasBaseException {
AtlasEntityWithExtInfo entity = getAndCacheEntityExtInfo(guid);
return getReferenceable(entity);
}
public Referenceable getReferenceable(AtlasEntityWithExtInfo entity) throws AtlasBaseException {
AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext();
ctx.addEntity(entity.getEntity());
for(Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
ctx.addEntity(entry.getValue());
}
return getReferenceable(entity.getEntity(), ctx);
}
public Referenceable getReferenceable(AtlasEntity entity, final ConverterContext ctx) throws AtlasBaseException {
AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasType entityType = typeRegistry.getType(entity.getTypeName());
Referenceable ref = (Referenceable) converter.fromV2ToV1(entity, entityType, ctx);
return ref;
}
public Struct getTrait(AtlasClassification classification) throws AtlasBaseException {
AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.CLASSIFICATION);
AtlasType classificationType = typeRegistry.getType(classification.getTypeName());
Struct trait = (Struct)converter.fromV2ToV1(classification, classificationType, new ConverterContext());
return trait;
}
public AtlasClassification toAtlasClassification(Struct classification) throws AtlasBaseException {
AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.CLASSIFICATION);
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
if (classificationType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.CLASSIFICATION.name(), classification.getTypeName());
}
AtlasClassification ret = (AtlasClassification)converter.fromV1ToV2(classification, classificationType, new AtlasFormatConverter.ConverterContext());
return ret;
}
public AtlasEntitiesWithExtInfo toAtlasEntity(Referenceable referenceable) throws AtlasBaseException {
AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName());
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), referenceable.getTypeName());
}
ConverterContext ctx = new ConverterContext();
AtlasEntity entity = converter.fromV1ToV2(referenceable, entityType, ctx);
ctx.addEntity(entity);
return ctx.getEntities();
}
public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntities(List<Referenceable> referenceables) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> toAtlasEntities({})", referenceables);
}
AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext();
for (Referenceable referenceable : referenceables) {
AtlasEntity entity = fromV1toV2Entity(referenceable, context);
context.addEntity(entity);
}
AtlasEntity.AtlasEntitiesWithExtInfo ret = context.getEntities();
if (LOG.isDebugEnabled()) {
LOG.debug("<== toAtlasEntities({}): ret=", referenceables, ret);
}
return ret;
}
public AtlasEntitiesWithExtInfo toAtlasEntities(String[] jsonEntities) throws AtlasBaseException, AtlasException {
Referenceable[] referenceables = new Referenceable[jsonEntities.length];
for (int i = 0; i < jsonEntities.length; i++) {
referenceables[i] = AtlasType.fromV1Json(jsonEntities[i], Referenceable.class);
}
AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
ConverterContext context = new ConverterContext();
for (Referenceable referenceable : referenceables) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName());
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), referenceable.getTypeName());
}
AtlasEntity entity = converter.fromV1ToV2(referenceable, entityType, context);
context.addEntity(entity);
}
AtlasEntitiesWithExtInfo ret = context.getEntities();
return ret;
}
private AtlasEntity fromV1toV2Entity(Referenceable referenceable, AtlasFormatConverter.ConverterContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> fromV1toV2Entity({})", referenceable);
}
AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasEntity entity = converter.fromV1ToV2(referenceable, typeRegistry.getType(referenceable.getTypeName()), context);
if (LOG.isDebugEnabled()) {
LOG.debug("<== fromV1toV2Entity({}): {}", referenceable, entity);
}
return entity;
}
public CreateUpdateEntitiesResult toCreateUpdateEntitiesResult(EntityMutationResponse reponse) {
CreateUpdateEntitiesResult ret = null;
if (reponse != null) {
Map<EntityOperation, List<AtlasEntityHeader>> mutatedEntities = reponse.getMutatedEntities();
Map<String, String> guidAssignments = reponse.getGuidAssignments();
ret = new CreateUpdateEntitiesResult();
if (MapUtils.isNotEmpty(guidAssignments)) {
ret.setGuidMapping(new GuidMapping(guidAssignments));
}
if (MapUtils.isNotEmpty(mutatedEntities)) {
EntityResult entityResult = new EntityResult();
for (Map.Entry<EntityOperation, List<AtlasEntityHeader>> e : mutatedEntities.entrySet()) {
switch (e.getKey()) {
case CREATE:
List<AtlasEntityHeader> createdEntities = mutatedEntities.get(EntityOperation.CREATE);
if (CollectionUtils.isNotEmpty(createdEntities)) {
Collections.reverse(createdEntities);
entityResult.set(EntityResult.OP_CREATED, getGuids(createdEntities));
}
break;
case UPDATE:
List<AtlasEntityHeader> updatedEntities = mutatedEntities.get(EntityOperation.UPDATE);
if (CollectionUtils.isNotEmpty(updatedEntities)) {
Collections.reverse(updatedEntities);
entityResult.set(EntityResult.OP_UPDATED, getGuids(updatedEntities));
}
break;
case PARTIAL_UPDATE:
List<AtlasEntityHeader> partialUpdatedEntities = mutatedEntities.get(EntityOperation.PARTIAL_UPDATE);
if (CollectionUtils.isNotEmpty(partialUpdatedEntities)) {
Collections.reverse(partialUpdatedEntities);
entityResult.set(EntityResult.OP_UPDATED, getGuids(partialUpdatedEntities));
}
break;
case DELETE:
List<AtlasEntityHeader> deletedEntities = mutatedEntities.get(EntityOperation.DELETE);
if (CollectionUtils.isNotEmpty(deletedEntities)) {
Collections.reverse(deletedEntities);
entityResult.set(EntityResult.OP_DELETED, getGuids(deletedEntities));
}
break;
}
}
ret.setEntityResult(entityResult);
}
}
return ret;
}
public List<String> getGuids(List<AtlasEntityHeader> entities) {
List<String> ret = null;
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>();
for (AtlasEntityHeader entity : entities) {
ret.add(entity.getGuid());
}
}
return ret;
}
public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
return getAndCacheEntity(guid, false);
}
public AtlasEntity getAndCacheEntity(String guid, boolean ignoreRelationshipAttributes) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid);
if (entity == null) {
if (ignoreRelationshipAttributes) {
entity = entityGraphRetrieverIgnoreRelationshipAttrs.toAtlasEntity(guid);
} else {
entity = entityGraphRetriever.toAtlasEntity(guid);
}
if (entity != null) {
context.cache(entity);
if (LOG.isDebugEnabled()) {
LOG.debug("Cache miss -> GUID = {}", guid);
}
}
}
return entity;
}
public AtlasEntityWithExtInfo getAndCacheEntityExtInfo(String guid) throws AtlasBaseException {
RequestContext context = RequestContext.get();
AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid);
if (entityWithExtInfo == null) {
entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
if (entityWithExtInfo != null) {
context.cache(entityWithExtInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("Cache miss -> GUID = {}", guid);
}
}
}
return entityWithExtInfo;
}
public EntityAuditEvent toV1AuditEvent(EntityAuditEventV2 v2Event) throws AtlasBaseException {
EntityAuditEvent ret = new EntityAuditEvent();
ret.setEntityId(v2Event.getEntityId());
ret.setTimestamp(v2Event.getTimestamp());
ret.setUser(v2Event.getUser());
ret.setDetails(v2Event.getDetails());
ret.setEventKey(v2Event.getEventKey());
ret.setAction(getV1AuditAction(v2Event.getAction()));
ret.setEntityDefinition(getReferenceable(v2Event.getEntityId()));
return ret;
}
public EntityAuditEventV2 toV2AuditEvent(EntityAuditEvent v1Event) throws AtlasBaseException {
EntityAuditEventV2 ret = new EntityAuditEventV2();
ret.setEntityId(v1Event.getEntityId());
ret.setTimestamp(v1Event.getTimestamp());
ret.setUser(v1Event.getUser());
ret.setDetails(v1Event.getDetails());
ret.setEventKey(v1Event.getEventKey());
ret.setAction(getV2AuditAction(v1Event.getAction()));
AtlasEntitiesWithExtInfo entitiesWithExtInfo = toAtlasEntity(v1Event.getEntityDefinition());
if (entitiesWithExtInfo != null && CollectionUtils.isNotEmpty(entitiesWithExtInfo.getEntities())) {
// there will only one source entity
AtlasEntity entity = entitiesWithExtInfo.getEntities().get(0);
ret.setEntity(entity);
}
return ret;
}
private EntityAuditEvent.EntityAuditAction getV1AuditAction(EntityAuditEventV2.EntityAuditActionV2 v2AuditAction) {
switch (v2AuditAction) {
case ENTITY_CREATE:
return EntityAuditEvent.EntityAuditAction.ENTITY_CREATE;
case ENTITY_UPDATE:
case BUSINESS_ATTRIBUTE_UPDATE:
case CUSTOM_ATTRIBUTE_UPDATE:
return EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE;
case ENTITY_DELETE:
return EntityAuditEvent.EntityAuditAction.ENTITY_DELETE;
case ENTITY_IMPORT_CREATE:
return EntityAuditEvent.EntityAuditAction.ENTITY_IMPORT_CREATE;
case ENTITY_IMPORT_UPDATE:
return EntityAuditEvent.EntityAuditAction.ENTITY_IMPORT_UPDATE;
case ENTITY_IMPORT_DELETE:
return EntityAuditEvent.EntityAuditAction.ENTITY_IMPORT_DELETE;
case CLASSIFICATION_ADD:
return EntityAuditEvent.EntityAuditAction.TAG_ADD;
case CLASSIFICATION_DELETE:
return EntityAuditEvent.EntityAuditAction.TAG_DELETE;
case CLASSIFICATION_UPDATE:
return EntityAuditEvent.EntityAuditAction.TAG_UPDATE;
case PROPAGATED_CLASSIFICATION_ADD:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_ADD;
case PROPAGATED_CLASSIFICATION_DELETE:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_DELETE;
case PROPAGATED_CLASSIFICATION_UPDATE:
return EntityAuditEvent.EntityAuditAction.PROPAGATED_TAG_UPDATE;
case LABEL_ADD:
return EntityAuditEvent.EntityAuditAction.LABEL_ADD;
case LABEL_DELETE:
return EntityAuditEvent.EntityAuditAction.LABEL_DELETE;
}
return null;
}
private EntityAuditEventV2.EntityAuditActionV2 getV2AuditAction(EntityAuditEvent.EntityAuditAction v1AuditAction) {
switch (v1AuditAction) {
case ENTITY_CREATE:
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE;
case ENTITY_UPDATE:
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE;
case ENTITY_DELETE:
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE;
case ENTITY_IMPORT_CREATE:
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_CREATE;
case ENTITY_IMPORT_UPDATE:
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_UPDATE;
case ENTITY_IMPORT_DELETE:
return EntityAuditEventV2.EntityAuditActionV2.ENTITY_IMPORT_DELETE;
case TAG_ADD:
return EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
case TAG_DELETE:
return EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
case TAG_UPDATE:
return EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
case PROPAGATED_TAG_ADD:
return EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
case PROPAGATED_TAG_DELETE:
return EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
case PROPAGATED_TAG_UPDATE:
return EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_UPDATE;
}
return null;
}
}