blob: 81884e8f0155724417280cfc6a1d217910b33db3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.typesystem.types;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.persistence.Id;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
/**
* Given a IReferenceableInstance, a Walker will traverse the Object Graph
* reachable form the instance. It will invoke the process call on the provided NodeProcessor
* for each non-primitive attribute (Structs, Traits, References, Arrays of Non-Primitives, Maps
* of Non-Primitives)
*/
public class ObjectGraphWalker {
final Queue<IReferenceableInstance> queue;
final TypeSystem typeSystem;
final NodeProcessor nodeProcessor;
Set<Id> processedIds;
public ObjectGraphWalker(TypeSystem typeSystem, NodeProcessor nodeProcessor) throws AtlasException {
this(typeSystem, nodeProcessor, (IReferenceableInstance) null);
}
public ObjectGraphWalker(TypeSystem typeSystem, NodeProcessor nodeProcessor, IReferenceableInstance start)
throws AtlasException {
this.typeSystem = typeSystem;
this.nodeProcessor = nodeProcessor;
queue = new LinkedList<>();
processedIds = new HashSet<>();
if (start != null) {
visitReferenceableInstance(start);
}
}
public ObjectGraphWalker(TypeSystem typeSystem, NodeProcessor nodeProcessor,
List<? extends IReferenceableInstance> roots) throws AtlasException {
this.typeSystem = typeSystem;
this.nodeProcessor = nodeProcessor;
queue = new LinkedList<IReferenceableInstance>();
processedIds = new HashSet<Id>();
for (IReferenceableInstance r : roots) {
visitReferenceableInstance(r);
}
}
public void walk() throws AtlasException {
while (!queue.isEmpty()) {
IReferenceableInstance r = queue.poll();
if(r != null) {
processReferenceableInstance(r);
}
}
}
public void addRoot(IReferenceableInstance root) {
visitReferenceableInstance(root);
}
void traverseValue(IDataType dT, Object val) throws AtlasException {
if (val != null) {
if (dT.getTypeCategory() == DataTypes.TypeCategory.ARRAY) {
IDataType elemType = ((DataTypes.ArrayType) dT).getElemType();
visitCollection(elemType, val);
} else if (dT.getTypeCategory() == DataTypes.TypeCategory.MAP) {
IDataType keyType = ((DataTypes.MapType) dT).getKeyType();
IDataType valueType = ((DataTypes.MapType) dT).getValueType();
visitMap(keyType, valueType, val);
} else if (dT.getTypeCategory() == DataTypes.TypeCategory.STRUCT
|| dT.getTypeCategory() == DataTypes.TypeCategory.TRAIT) {
visitStruct(val);
} else if (dT.getTypeCategory() == DataTypes.TypeCategory.CLASS) {
visitReferenceableInstance(val);
}
}
}
void visitMap(IDataType keyType, IDataType valueType, Object val) throws AtlasException {
if (keyType.getTypeCategory() == DataTypes.TypeCategory.PRIMITIVE
&& valueType.getTypeCategory() == DataTypes.TypeCategory.PRIMITIVE) {
return;
}
if (val != null) {
Iterator<Map.Entry> it = null;
if (Map.class.isAssignableFrom(val.getClass())) {
it = ((Map) val).entrySet().iterator();
ImmutableMap.Builder b = ImmutableMap.builder();
while (it.hasNext()) {
Map.Entry e = it.next();
traverseValue(keyType, e.getKey());
traverseValue(valueType, e.getValue());
}
}
}
}
void visitCollection(IDataType elemType, Object val) throws AtlasException {
if (elemType.getTypeCategory() == DataTypes.TypeCategory.PRIMITIVE) {
return;
}
if (val != null) {
Iterator it = null;
if (val instanceof Collection) {
it = ((Collection) val).iterator();
} else if (val instanceof Iterable) {
it = ((Iterable) val).iterator();
} else if (val instanceof Iterator) {
it = (Iterator) val;
}
if (it != null) {
DataTypes.TypeCategory elemCategory = elemType.getTypeCategory();
while (it.hasNext()) {
Object elem = it.next();
traverseValue(elemType, elem);
}
}
}
}
void visitStruct(Object val) throws AtlasException {
if (val == null || !(val instanceof IStruct)) {
return;
}
IStruct i = (IStruct) val;
IConstructableType type = typeSystem.getDataType(IConstructableType.class, i.getTypeName());
for (Map.Entry<String, AttributeInfo> e : type.fieldMapping().fields.entrySet()) {
AttributeInfo aInfo = e.getValue();
String attrName = e.getKey();
if (aInfo.dataType().getTypeCategory() != DataTypes.TypeCategory.PRIMITIVE) {
Object aVal = i.get(attrName);
nodeProcessor.processNode(new Node(i, attrName, aInfo, aVal));
traverseValue(aInfo.dataType(), aVal);
}
}
}
void visitReferenceableInstance(Object val) {
if (val == null || !(val instanceof IReferenceableInstance)) {
return;
}
IReferenceableInstance ref = (IReferenceableInstance) val;
if (!processedIds.contains(ref.getId())) {
processedIds.add(ref.getId());
if (!(ref instanceof Id)) {
queue.add(ref);
}
}
}
void processReferenceableInstance(IReferenceableInstance ref) throws AtlasException {
nodeProcessor.processNode(new Node(ref, null, null, null));
visitStruct(ref);
ImmutableList<String> traits = ref.getTraits();
for (String trait : traits) {
visitStruct(ref.getTrait(trait));
}
}
public interface NodeProcessor {
void processNode(Node nd) throws AtlasException;
}
/**
* Represents a non-primitive value of an instance.
*/
public static class Node {
public final IStruct instance;
public final String attributeName;
public final AttributeInfo aInfo;
public final Object value;
public Node(IStruct instance, String attributeName, AttributeInfo aInfo, Object value) {
this.instance = instance;
this.attributeName = attributeName;
this.aInfo = aInfo;
this.value = value;
}
@Override
public String toString(){
StringBuilder string = new StringBuilder().append(instance).append(aInfo).append(value);
return string.toString();
}
}
}