blob: 12e8bb1f2baf589dee8dedeb6cd6cee1c8756eb0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphDiscoveryContext discoveryContext;
public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
this.typeRegistry = typeRegistry;
this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
}
@Override
public void init() throws AtlasBaseException {
//Nothing to do
}
@Override
public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException {
// walk through entities in stream and validate them; record entity references
discover();
// resolve entity references discovered in previous step
resolveReferences();
return discoveryContext;
}
@Override
public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException {
List<String> messages = new ArrayList<>();
if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid());
}
AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
}
type.validateValue(entity, entity.getTypeName(), messages);
if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
}
type.getNormalizedValue(entity);
}
@Override
public void validateAndNormalizeForUpdate(AtlasEntity entity) throws AtlasBaseException {
List<String> messages = new ArrayList<>();
if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid());
}
AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (type == null) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
}
type.validateValueForUpdate(entity, entity.getTypeName(), messages);
if (!messages.isEmpty()) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
}
type.getNormalizedValueForUpdate(entity);
}
@Override
public void cleanUp() throws AtlasBaseException {
discoveryContext.cleanUp();
}
protected void discover() throws AtlasBaseException {
EntityStream entityStream = discoveryContext.getEntityStream();
Set<String> walkedEntities = new HashSet<>();
// walk through top-level entities and find entity references
while (entityStream.hasNext()) {
AtlasEntity entity = entityStream.next();
if (entity == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity");
}
walkEntityGraph(entity);
walkedEntities.add(entity.getGuid());
}
// walk through entities referenced by other entities
// referencedGuids will be updated within this for() loop; avoid use of iterators
List<String> referencedGuids = discoveryContext.getReferencedGuids();
for (int i = 0; i < referencedGuids.size(); i++) {
String guid = referencedGuids.get(i);
if (walkedEntities.contains(guid)) {
continue;
}
AtlasEntity entity = entityStream.getByGuid(guid);
if (entity != null) {
walkEntityGraph(entity);
walkedEntities.add(entity.getGuid());
}
}
}
protected void resolveReferences() throws AtlasBaseException {
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
new UniqAttrBasedEntityResolver(typeRegistry)
};
for (EntityResolver resolver : entityResolvers) {
resolver.resolveEntityReferences(discoveryContext);
}
}
private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException {
if (type == null || val == null) {
return;
}
if (val instanceof AtlasObjectId) {
AtlasObjectId objId = (AtlasObjectId)val;
if (!AtlasTypeUtil.isValid(objId)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
}
recordObjectReference(objId);
} else if (val instanceof Map) {
AtlasObjectId objId = new AtlasObjectId((Map)val);
if (!AtlasTypeUtil.isValid(objId)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
}
recordObjectReference(objId);
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
}
}
void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException {
if (attrType == null || val == null) {
return;
}
switch (attrType.getTypeCategory()) {
case PRIMITIVE:
case ENUM:
return;
case ARRAY: {
AtlasArrayType arrayType = (AtlasArrayType) attrType;
AtlasType elemType = arrayType.getElementType();
visitCollectionReferences(elemType, val);
}
break;
case MAP: {
AtlasType keyType = ((AtlasMapType) attrType).getKeyType();
AtlasType valueType = ((AtlasMapType) attrType).getValueType();
visitMapReferences(keyType, valueType, val);
}
break;
case STRUCT:
visitStruct((AtlasStructType)attrType, val);
break;
case OBJECT_ID_TYPE:
visitReference((AtlasObjectIdType) attrType, val);
break;
default:
throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, attrType.getTypeCategory().name());
}
}
void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException {
if (keyType == null || valueType == null || val == null) {
return;
}
if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) {
return;
}
if (Map.class.isAssignableFrom(val.getClass())) {
Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
while (it.hasNext()) {
Map.Entry e = it.next();
visitAttribute(keyType, e.getKey());
visitAttribute(valueType, e.getValue());
}
}
}
void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException {
if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) {
return;
}
Iterator it = null;
if (val instanceof Collection) {
it = ((Collection) val).iterator();
} else if (val instanceof Iterable) {
it = ((Iterable) val).iterator();
} else if (val instanceof Iterator) {
it = (Iterator) val;
}
if (it != null) {
while (it.hasNext()) {
Object elem = it.next();
visitAttribute(elemType, elem);
}
}
}
void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException {
if (structType == null || val == null) {
return;
}
AtlasStruct struct;
if (val instanceof AtlasStruct) {
struct = (AtlasStruct) val;
} else if (val instanceof Map) {
Map attributes = AtlasTypeUtil.toStructAttributes((Map) val);
struct = new AtlasStruct(structType.getTypeName(), attributes);
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString());
}
for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
AtlasType attrType = attribute.getAttributeType();
Object attrVal = struct.getAttribute(attribute.getName());
visitAttribute(attrType, attrVal);
}
}
void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException {
if (entity == null) {
return;
}
AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
recordObjectReference(entity.getGuid());
visitStruct(type, entity);
}
boolean isPrimitive(TypeCategory typeCategory) {
return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM;
}
private void recordObjectReference(String guid) {
discoveryContext.addReferencedGuid(guid);
}
private void recordObjectReference(AtlasObjectId objId) {
if (AtlasTypeUtil.isValidGuid(objId)) {
discoveryContext.addReferencedGuid(objId.getGuid());
} else {
discoveryContext.addReferencedByUniqAttribs(objId);
}
}
}