blob: ec401b3aafaea644eb52d06601760fccffc61391 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import com.google.common.collect.ImmutableSortedMap;
import org.apache.impala.analysis.ColumnLineageGraph.Vertex.Metadata;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.common.Id;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.thrift.TEdgeType;
import org.apache.impala.thrift.TLineageGraph;
import org.apache.impala.thrift.TMultiEdge;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TVertex;
import org.apache.impala.thrift.TVertexMetadata;
import org.apache.impala.util.TUniqueIdUtil;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
/**
* Represents the column lineage graph of a query. This is a directional graph that is
* used to track dependencies among the table/column entities that participate in
* a query. There are two types of dependencies that are represented as edges in the
* column lineage graph:
* a) Projection dependency: This is a dependency between a set of source
* columns (base table columns) and a single target (result expr or table column).
* This dependency indicates that values of the target depend on the values of the source
* columns.
* b) Predicate dependency: This is a dependency between a set of target
* columns (or exprs) and a set of source columns (base table columns). It indicates that
* the source columns restrict the values of their targets (e.g. by participating in
* WHERE clause predicates).
*
* The following dependencies are generated for a query:
* - Exactly one projection dependency for every result expr / target column.
* - Exactly one predicate dependency that targets all result exprs / target cols and
* depends on all columns participating in a conjunct in the query.
* - Special case of analytic fns: One predicate dependency per result expr / target col
* whose value is directly or indirectly affected by an analytic function with a
* partition by and/or order by clause.
*/
public class ColumnLineageGraph {
/**
* Represents a vertex in the column lineage graph. A Vertex may correspond to a base
* table column, a column in the destination table (for the case of INSERT or CTAS
* queries) or a result expr (labeled column of a query result set).
*/
public static final class Vertex implements Comparable<Vertex> {
public static class Metadata {
private final String tableName_; // the table name
private final long tableCreateTime_; // the table/view create time
public Metadata(String tableName, long tableCreateTime) {
tableName_ = tableName;
tableCreateTime_ = tableCreateTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Metadata metadata = (Metadata) o;
return tableCreateTime_ == metadata.tableCreateTime_ &&
Objects.equals(tableName_, metadata.tableName_);
}
@Override
public int hashCode() {
return Objects.hash(tableName_, tableCreateTime_);
}
/**
* For testing only. We ignore the table create time.
*/
private boolean equalsForTests(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Metadata metadata = (Metadata) o;
return Objects.equals(tableName_, metadata.tableName_);
}
}
// Unique identifier of this vertex.
private final VertexId id_;
private final String type_ = "COLUMN";
// A fully-qualified column name or the label of a result expr
private final String label_;
// The table metadata.
private final Metadata metadata_;
public Vertex(VertexId id, String label, Metadata metadata) {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(label);
id_ = id;
label_ = label;
metadata_ = metadata;
}
public VertexId getVertexId() { return id_; }
public String getLabel() { return label_; }
public String getType() { return type_; }
public Metadata getMetadata() { return metadata_; }
@Override
public String toString() { return "(" + id_ + ":" + type_ + ":" + label_ + ")"; }
/**
* Encodes this Vertex object into a JSON object represented by a Map.
*/
public Map<String, Object> toJson() {
// Use a LinkedHashMap to generate a strict ordering of elements.
Map<String,Object> obj = new LinkedHashMap<>();
obj.put("id", id_.asInt());
obj.put("vertexType", type_);
obj.put("vertexId", label_);
if (metadata_ != null) {
JSONObject jsonMetadata = new JSONObject();
jsonMetadata.put("tableName", metadata_.tableName_);
jsonMetadata.put("tableCreateTime", metadata_.tableCreateTime_);
obj.put("metadata", jsonMetadata);
}
return obj;
}
/**
* Constructs a Vertex object from a JSON object. The new object is returned.
*/
public static Vertex fromJsonObj(JSONObject obj) {
int id = ((Long) obj.get("id")).intValue();
String label = (String) obj.get("vertexId");
JSONObject jsonMetadata = (JSONObject) obj.get("metadata");
if (jsonMetadata == null) {
return new Vertex(new VertexId(id), label, null);
}
String tableName = (String) jsonMetadata.get("tableName");
long tableCreateTime = (Long) jsonMetadata.get("tableCreateTime");
return new Vertex(new VertexId(id), label, new Metadata(tableName,
tableCreateTime));
}
/**
* Encodes this Vertex object into a thrift object
*/
public TVertex toThrift() {
TVertex vertex = new TVertex(id_.asInt(), label_);
if (metadata_ != null) {
TVertexMetadata metadata = new TVertexMetadata(metadata_.tableName_,
metadata_.tableCreateTime_);
vertex.setMetadata(metadata);
}
return vertex;
}
/**
* Constructs a Vertex object from a thrift object.
*/
public static Vertex fromThrift(TVertex vertex) {
int id = ((Long) vertex.id).intValue();
TVertexMetadata thriftMetadata = vertex.getMetadata();
Metadata metadata = null;
if (thriftMetadata != null) {
metadata = new Metadata(thriftMetadata.getTable_name(),
thriftMetadata.getTable_create_time());
}
return new Vertex(new VertexId(id), vertex.label, metadata);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Vertex vertex = (Vertex) o;
return Objects.equals(id_, vertex.id_) &&
Objects.equals(type_, vertex.type_) &&
Objects.equals(label_, vertex.label_) &&
Objects.equals(metadata_, vertex.metadata_);
}
@Override
public int hashCode() {
return Objects.hash(id_, type_, label_, metadata_);
}
/**
* For testing only.
*/
private boolean equalsForTests(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Vertex vertex = (Vertex) o;
return Objects.equals(id_, vertex.id_) &&
((metadata_ == vertex.metadata_) ||
metadata_ != null && metadata_.equalsForTests(vertex.metadata_));
}
@Override
public int compareTo(Vertex cmp) { return this.id_.compareTo(cmp.id_); }
}
/**
* Represents the unique identifier of a Vertex.
*/
private static final class VertexId extends Id<VertexId> {
protected VertexId(int id) {
super(id);
}
public static IdGenerator<VertexId> createGenerator() {
return new IdGenerator<VertexId>() {
@Override
public VertexId getNextId() { return new VertexId(nextId_++); }
@Override
public VertexId getMaxId() { return new VertexId(nextId_ - 1); }
};
}
}
/**
* Represents a set of uni-directional edges in the column lineage graph, one edge from
* every source Vertex in 'sources_' to every target Vertex in 'targets_'. An edge
* indicates a dependency between a source and a target Vertex. There are two types of
* edges, PROJECTION and PREDICATE, that are described in the ColumnLineageGraph class.
*/
private static final class MultiEdge {
public static enum EdgeType {
PROJECTION, PREDICATE
}
private final Set<Vertex> sources_;
private final Set<Vertex> targets_;
private final EdgeType edgeType_;
public MultiEdge(Set<Vertex> sources, Set<Vertex> targets, EdgeType type) {
sources_ = sources;
targets_ = targets;
edgeType_ = type;
}
/**
* Return an ordered set of source vertices.
*/
private ImmutableSortedSet<Vertex> getOrderedSources() {
return ImmutableSortedSet.copyOf(sources_);
}
/**
* Return an ordered set of target vertices.
*/
private ImmutableSortedSet<Vertex> getOrderedTargets() {
return ImmutableSortedSet.copyOf(targets_);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Joiner joiner = Joiner.on(",");
builder.append("Sources: [");
builder.append(joiner.join(getOrderedSources()) + "]\n");
builder.append("Targets: [");
builder.append(joiner.join(getOrderedTargets()) + "]\n");
builder.append("Type: " + edgeType_);
return builder.toString();
}
/**
* Encodes this MultiEdge object to a JSON object represented by a Map.
* Returns a LinkedHashMap to guarantee a consistent ordering of elements.
*/
public Map<String,Object> toJson() {
Map<String,Object> obj = new LinkedHashMap<>();
// Add sources
JSONArray sourceIds = new JSONArray();
for (Vertex vertex: getOrderedSources()) {
sourceIds.add(vertex.getVertexId());
}
obj.put("sources", sourceIds);
// Add targets
JSONArray targetIds = new JSONArray();
for (Vertex vertex: getOrderedTargets()) {
targetIds.add(vertex.getVertexId());
}
obj.put("targets", targetIds);
obj.put("edgeType", edgeType_.toString());
return obj;
}
/**
* Encodes this MultiEdge object to a thrift object
*/
public TMultiEdge toThrift() {
List<TVertex> sources = new ArrayList<>();
for (Vertex vertex: getOrderedSources()) {
sources.add(vertex.toThrift());
}
List<TVertex> targets = new ArrayList<>();
for (Vertex vertex: getOrderedTargets()) {
targets.add(vertex.toThrift());
}
if (edgeType_ == EdgeType.PROJECTION) {
return new TMultiEdge(sources, targets, TEdgeType.PROJECTION);
}
return new TMultiEdge(sources, targets, TEdgeType.PREDICATE);
}
/**
* Constructs a MultiEdge object from a thrift object.
*/
public static MultiEdge fromThrift(TMultiEdge obj) {
Set<Vertex> sources = new HashSet<>();
for (TVertex vertex: obj.sources) {
sources.add(Vertex.fromThrift(vertex));
}
Set<Vertex> targets = new HashSet<>();
for (TVertex vertex: obj.targets) {
targets.add(Vertex.fromThrift(vertex));
}
if (obj.edgetype == TEdgeType.PROJECTION) {
return new MultiEdge(sources, targets, EdgeType.PROJECTION);
}
return new MultiEdge(sources, targets, EdgeType.PREDICATE);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MultiEdge multiEdge = (MultiEdge) o;
return Objects.equals(sources_, multiEdge.sources_) &&
Objects.equals(targets_, multiEdge.targets_) &&
edgeType_ == multiEdge.edgeType_;
}
@Override
public int hashCode() {
return Objects.hash(sources_, targets_, edgeType_);
}
/**
* For testing only.
*/
private boolean equalsForTests(Object obj) {
if (obj == null) return false;
if (obj.getClass() != this.getClass()) return false;
MultiEdge edge = (MultiEdge) obj;
return setEqualsForTests(edge.sources_, this.sources_) &&
setEqualsForTests(edge.targets_, this.targets_) &&
edge.edgeType_ == this.edgeType_;
}
}
public static class ColumnLabel implements Comparable<ColumnLabel> {
private final String columnLabel_;
private final TableName tableName_;
public ColumnLabel(String columnName, TableName tableName) {
columnLabel_ = columnName;
tableName_ = tableName;
}
public ColumnLabel(String columnName) {
this(columnName, null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ColumnLabel that = (ColumnLabel) o;
return Objects.equals(columnLabel_, that.columnLabel_);
}
@Override
public int hashCode() {
return Objects.hash(columnLabel_);
}
@Override
public int compareTo(ColumnLabel o) {
return columnLabel_.compareTo(o.columnLabel_);
}
}
private final static Logger LOG = LoggerFactory.getLogger(ColumnLineageGraph.class);
// Query statement
private String queryStr_;
private TUniqueId queryId_;
// Name of the user that issued this query
private String user_;
private final List<Expr> resultDependencyPredicates_ = new ArrayList<>();
private final List<MultiEdge> edges_ = new ArrayList<>();
// Timestamp in seconds since epoch (GMT) this query was submitted for execution.
private long timestamp_;
// Map of Vertex labels to Vertex objects.
private final Map<String, Vertex> vertices_ = new HashMap<>();
// Map of Vertex ids to Vertex objects. Used primarily during the construction of the
// ColumnLineageGraph from a serialized JSON object.
private final Map<VertexId, Vertex> idToVertexMap_ = new HashMap<>();
// For an INSERT or a CTAS, these are the columns of the
// destination table plus any partitioning columns (when dynamic partitioning is used).
// For a SELECT stmt, they are the labels of the result exprs.
private final List<ColumnLabel> targetColumnLabels_ = new ArrayList<>();
// Repository for tuple and slot descriptors for this query. Use it to construct the
// column lineage graph.
private DescriptorTable descTbl_;
private final IdGenerator<VertexId> vertexIdGenerator = VertexId.createGenerator();
public ColumnLineageGraph() { }
/**
* Private c'tor, used only for testing.
*/
private ColumnLineageGraph(String stmt, TUniqueId queryId, String user, long timestamp)
{
queryStr_ = stmt;
queryId_ = queryId;
user_ = user;
timestamp_ = timestamp;
}
private void setVertices(Set<Vertex> vertices) {
for (Vertex vertex: vertices) {
vertices_.put(vertex.getLabel(), vertex);
idToVertexMap_.put(vertex.getVertexId(), vertex);
}
}
/**
* Creates a new MultiEdge in the column lineage graph from the sets of 'sources' and
* 'targets' labels (representing column names or result expr labels). The new
* MultiEdge object is returned.
*/
private MultiEdge createMultiEdge(Set<ColumnLabel> targets,
Map<String, SlotDescriptor> sources, MultiEdge.EdgeType type, Analyzer analyzer) {
// createVertex() generates new IDs; we sort the input sets to make the output
// deterministic and independent of the ordering of the input sets.
Set<Vertex> targetVertices = new HashSet<>();
for (ColumnLabel target: ImmutableSortedSet.copyOf(targets)) {
Metadata metadata = null;
if (target.tableName_ != null) {
FeTable feTable = analyzer.getStmtTableCache().tables.get(target.tableName_);
if (feTable != null && feTable.getMetaStoreTable() != null) {
metadata = new Metadata(target.tableName_.toString(),
feTable.getMetaStoreTable().getCreateTime());
} else {
// -1 is just a placeholder that will be updated after the table/view has been
// created. See impala-server.cc (LogLineageRecord) for more information.
metadata = new Metadata(target.tableName_.toString(), -1);
}
}
targetVertices.add(createVertex(target.columnLabel_, metadata));
}
Set<Vertex> sourceVertices = new HashSet<>();
for (Map.Entry<String, SlotDescriptor> source:
ImmutableSortedMap.copyOf(sources).entrySet()) {
FeTable feTable = source.getValue().getParent().getTable();
Preconditions.checkState(feTable != null);
Metadata metadata = feTable != null && feTable.getMetaStoreTable() != null ?
new Metadata(feTable.getTableName().toString(),
feTable.getMetaStoreTable().getCreateTime()) : null;
sourceVertices.add(createVertex(source.getKey(), metadata));
}
MultiEdge edge = new MultiEdge(sourceVertices, targetVertices, type);
edges_.add(edge);
return edge;
}
/**
* Creates a new vertex in the column lineage graph. The new Vertex object is
* returned. If a Vertex with the same label already exists, reuse it.
*/
private Vertex createVertex(String label, Metadata metadata) {
Vertex newVertex = vertices_.get(label);
if (newVertex != null) return newVertex;
newVertex = new Vertex(vertexIdGenerator.getNextId(), label, metadata);
vertices_.put(newVertex.getLabel(), newVertex);
idToVertexMap_.put(newVertex.getVertexId(), newVertex);
return newVertex;
}
/**
* Computes the column lineage graph of a query from the list of query result exprs.
* 'rootAnalyzer' is the Analyzer that was used for the analysis of the query.
*/
public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
init(rootAnalyzer);
// Compute the dependencies only if result expressions are available.
if (resultExprs != null && !resultExprs.isEmpty()) {
computeProjectionDependencies(resultExprs, rootAnalyzer);
computeResultPredicateDependencies(rootAnalyzer);
}
}
/**
* Initialize the ColumnLineageGraph from the root analyzer of a query.
*/
private void init(Analyzer analyzer) {
Preconditions.checkNotNull(analyzer);
Preconditions.checkState(analyzer.isRootAnalyzer());
TQueryCtx queryCtx = analyzer.getQueryCtx();
if (queryCtx.client_request.isSetRedacted_stmt()) {
queryStr_ = queryCtx.client_request.redacted_stmt;
} else {
queryStr_ = queryCtx.client_request.stmt;
}
Preconditions.checkNotNull(queryStr_);
timestamp_ = queryCtx.start_unix_millis / 1000;
descTbl_ = analyzer.getDescTbl();
user_ = analyzer.getUser().getName();
queryId_ = queryCtx.query_id;
}
private void computeProjectionDependencies(List<Expr> resultExprs, Analyzer analyzer) {
Preconditions.checkNotNull(resultExprs);
Preconditions.checkState(!resultExprs.isEmpty());
Preconditions.checkState(resultExprs.size() == targetColumnLabels_.size());
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
Map<String, SlotDescriptor> sourceBaseCols = new HashMap<>();
List<Expr> dependentExprs = new ArrayList<>();
getSourceBaseCols(expr, sourceBaseCols, dependentExprs, false);
Set<ColumnLabel> targets = Sets.newHashSet(targetColumnLabels_.get(i));
createMultiEdge(targets, sourceBaseCols, MultiEdge.EdgeType.PROJECTION, analyzer);
if (!dependentExprs.isEmpty()) {
// We have additional exprs that 'expr' has a predicate dependency on.
// Gather the transitive predicate dependencies of 'expr' based on its direct
// predicate dependencies. For each direct predicate dependency p, 'expr' is
// transitively predicate dependent on all exprs that p is projection and
// predicate dependent on.
Map<String, SlotDescriptor> predicateBaseCols = new HashMap<>();
for (Expr dependentExpr: dependentExprs) {
getSourceBaseCols(dependentExpr, predicateBaseCols, null, true);
}
createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE,
analyzer);
}
}
}
/**
* Compute predicate dependencies for the query result, i.e. exprs that affect the
* possible values of the result exprs / target columns, such as predicates in a WHERE
* clause.
*/
private void computeResultPredicateDependencies(Analyzer analyzer) {
List<Expr> conjuncts = analyzer.getConjuncts();
for (Expr expr: conjuncts) {
if (expr.isAuxExpr()) continue;
resultDependencyPredicates_.add(expr);
}
Map<String, SlotDescriptor> predicateBaseCols = new HashMap<>();
for (Expr expr: resultDependencyPredicates_) {
getSourceBaseCols(expr, predicateBaseCols, null, true);
}
if (predicateBaseCols.isEmpty()) return;
Set<ColumnLabel> targets = Sets.newHashSet(targetColumnLabels_);
createMultiEdge(targets, predicateBaseCols, MultiEdge.EdgeType.PREDICATE, analyzer);
}
/**
* Identify the base table columns that 'expr' is connected to by recursively resolving
* all associated slots through inline views and materialization points to base-table
* slots. If 'directPredDeps' is not null, it is populated with the exprs that
* have a predicate dependency with 'expr' (e.g. partitioning and order by exprs for
* the case of an analytic function). If 'traversePredDeps' is false, not all the
* children exprs of 'expr' are used to identify the base columns that 'expr' is
* connected to. Which children are filtered depends on the type of 'expr' (e.g. for
* AnalyticFunctionExpr, grouping and sorting exprs are filtered out).
*/
private void getSourceBaseCols(Expr expr, Map<String, SlotDescriptor> sourceBaseCols,
List<Expr> directPredDeps, boolean traversePredDeps) {
List<Expr> exprsToTraverse = getProjectionDeps(expr);
List<Expr> predicateDepExprs = getPredicateDeps(expr);
if (directPredDeps != null) directPredDeps.addAll(predicateDepExprs);
if (traversePredDeps) exprsToTraverse.addAll(predicateDepExprs);
List<SlotId> slotIds = new ArrayList<>();
for (Expr e: exprsToTraverse) {
e.getIds(null, slotIds);
}
for (SlotId slotId: slotIds) {
SlotDescriptor slotDesc = descTbl_.getSlotDesc(slotId);
List<Expr> sourceExprs = slotDesc.getSourceExprs();
if (sourceExprs.isEmpty() && slotDesc.isScanSlot() &&
slotDesc.getPath().isRootedAtTuple()) {
// slot should correspond to a materialized tuple of a table
Preconditions.checkState(slotDesc.getParent().isMaterialized());
List<String> path = slotDesc.getPath().getCanonicalPath();
sourceBaseCols.put(Joiner.on(".").join(path), slotDesc);
} else {
for (Expr sourceExpr: sourceExprs) {
getSourceBaseCols(sourceExpr, sourceBaseCols, directPredDeps,
traversePredDeps);
}
}
}
}
/**
* Retrieve the exprs that 'e' is directly projection dependent on.
* TODO Handle conditional exprs (e.g. CASE, IF).
*/
private List<Expr> getProjectionDeps(Expr e) {
Preconditions.checkNotNull(e);
List<Expr> outputExprs = new ArrayList<>();
if (e instanceof AnalyticExpr) {
AnalyticExpr analytic = (AnalyticExpr) e;
outputExprs.addAll(analytic.getChildren().subList(0,
analytic.getFnCall().getParams().size()));
} else {
outputExprs.add(e);
}
return outputExprs;
}
/**
* Retrieve the exprs that 'e' is directly predicate dependent on.
* TODO Handle conditional exprs (e.g. CASE, IF).
*/
private List<Expr> getPredicateDeps(Expr e) {
Preconditions.checkNotNull(e);
List<Expr> outputExprs = new ArrayList<>();
if (e instanceof AnalyticExpr) {
AnalyticExpr analyticExpr = (AnalyticExpr) e;
outputExprs.addAll(analyticExpr.getPartitionExprs());
for (OrderByElement orderByElem: analyticExpr.getOrderByElements()) {
outputExprs.add(orderByElem.getExpr());
}
}
return outputExprs;
}
public void addDependencyPredicates(Collection<Expr> exprs) {
resultDependencyPredicates_.addAll(exprs);
}
/**
* Encodes the ColumnLineageGraph object to JSON.
*/
public String toJson() {
if (Strings.isNullOrEmpty(queryStr_)) return "";
Map<String, Object> obj = new LinkedHashMap<>();
obj.put("queryText", queryStr_);
obj.put("queryId", TUniqueIdUtil.PrintId(queryId_));
obj.put("hash", getQueryHash(queryStr_));
obj.put("user", user_);
obj.put("timestamp", timestamp_);
// Add edges
JSONArray edges = new JSONArray();
for (MultiEdge edge: edges_) {
edges.add(edge.toJson());
}
obj.put("edges", edges);
// Add vertices
TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
JSONArray vertices = new JSONArray();
for (Vertex vertex: sortedVertices) {
vertices.add(vertex.toJson());
}
obj.put("vertices", vertices);
return JSONValue.toJSONString(obj);
}
/**
* Serializes the ColumnLineageGraph to a thrift object
*/
public TLineageGraph toThrift() {
TLineageGraph graph = new TLineageGraph();
if (Strings.isNullOrEmpty(queryStr_)) return graph;
graph.setQuery_text(queryStr_);
graph.setQuery_id(queryId_);
graph.setHash(getQueryHash(queryStr_));
graph.setUser(user_);
graph.setStarted(timestamp_);
// Add edges
List<TMultiEdge> edges = new ArrayList<>();
for (MultiEdge edge: edges_) {
edges.add(edge.toThrift());
}
graph.setEdges(edges);
// Add vertices
TreeSet<Vertex> sortedVertices = Sets.newTreeSet(vertices_.values());
List<TVertex> vertices = new ArrayList<>();
for (Vertex vertex: sortedVertices) {
vertices.add(vertex.toThrift());
}
graph.setVertices(vertices);
return graph;
}
/**
* Creates a LineageGraph object from a thrift object
*/
public static ColumnLineageGraph fromThrift(TLineageGraph obj) {
ColumnLineageGraph lineage =
new ColumnLineageGraph(obj.query_text, obj.query_id, obj.user, obj.started);
Map<TVertex, Vertex> vertexMap = new HashMap<>();
TreeSet<Vertex> vertices = Sets.newTreeSet();
for (TVertex vertex: obj.vertices) {
Vertex v = Vertex.fromThrift(vertex);
vertices.add(v);
}
lineage.setVertices(vertices);
for (TMultiEdge edge: obj.edges) {
MultiEdge e = MultiEdge.fromThrift(edge);
lineage.edges_.add(e);
}
return lineage;
}
private String getQueryHash(String queryStr) {
Hasher hasher = Hashing.md5().newHasher();
hasher.putString(queryStr);
return hasher.hash().toString();
}
/**
* Creates a ColumnLineageGraph object from a serialized JSON record. The new
* ColumnLineageGraph object is returned. Used only during testing.
*/
public static ColumnLineageGraph createFromJSON(String json) {
if (json == null || json.isEmpty()) return null;
JSONParser parser = new JSONParser();
Object obj = null;
try {
obj = parser.parse(json);
} catch (ParseException e) {
LOG.error("Error parsing serialized column lineage graph: " + e.getMessage());
return null;
}
if (!(obj instanceof JSONObject)) return null;
JSONObject jsonObj = (JSONObject) obj;
String stmt = (String) jsonObj.get("queryText");
TUniqueId queryId = TUniqueIdUtil.ParseId((String) jsonObj.get("queryId"));
String user = (String) jsonObj.get("user");
long timestamp = (Long) jsonObj.get("timestamp");
ColumnLineageGraph graph = new ColumnLineageGraph(stmt, queryId, user, timestamp);
JSONArray serializedVertices = (JSONArray) jsonObj.get("vertices");
Set<Vertex> vertices = new HashSet<>();
for (int i = 0; i < serializedVertices.size(); ++i) {
Vertex v = Vertex.fromJsonObj((JSONObject) serializedVertices.get(i));
vertices.add(v);
}
graph.setVertices(vertices);
JSONArray serializedEdges = (JSONArray) jsonObj.get("edges");
for (int i = 0; i < serializedEdges.size(); ++i) {
MultiEdge e =
graph.createMultiEdgeFromJSONObj((JSONObject) serializedEdges.get(i));
graph.edges_.add(e);
}
return graph;
}
private MultiEdge createMultiEdgeFromJSONObj(JSONObject jsonEdge) {
Preconditions.checkNotNull(jsonEdge);
JSONArray sources = (JSONArray) jsonEdge.get("sources");
Set<Vertex> sourceVertices = getVerticesFromJSONArray(sources);
JSONArray targets = (JSONArray) jsonEdge.get("targets");
Set<Vertex> targetVertices = getVerticesFromJSONArray(targets);
MultiEdge.EdgeType type =
MultiEdge.EdgeType.valueOf((String) jsonEdge.get("edgeType"));
return new MultiEdge(sourceVertices, targetVertices, type);
}
private Set<Vertex> getVerticesFromJSONArray(JSONArray vertexIdArray) {
Set<Vertex> vertices = new HashSet<>();
for (int i = 0; i < vertexIdArray.size(); ++i) {
int sourceId = ((Long) vertexIdArray.get(i)).intValue();
Vertex sourceVertex = idToVertexMap_.get(new VertexId(sourceId));
Preconditions.checkNotNull(sourceVertex);
vertices.add(sourceVertex);
}
return vertices;
}
/**
* This is only for testing. It does not check for user and timestamp fields.
*/
public boolean equalsForTests(Object obj) {
if (obj == null) return false;
if (obj.getClass() != this.getClass()) return false;
ColumnLineageGraph g = (ColumnLineageGraph) obj;
if (!mapEqualsForTests(this.vertices_, g.vertices_) ||
!listEqualsForTests(this.edges_, g.edges_)) {
return false;
}
return true;
}
private static boolean mapEqualsForTests(Map<String, Vertex> map1,
Map<String, Vertex> map2) {
if (map1.size() != map2.size()) return false;
Iterator<Entry<String, Vertex>> i = map1.entrySet().iterator();
while (i.hasNext()) {
Entry<String, Vertex> e = i.next();
String key = e.getKey();
Vertex value = e.getValue();
if (value == null) {
if (!(map2.get(key) == null && map2.containsKey(key))) return false;
} else {
if (!value.equalsForTests(map2.get(key))) return false;
}
}
return true;
}
private static boolean setEqualsForTests(Set<Vertex> set1, Set<Vertex> set2) {
if (set1.size() != set2.size()) return false;
for (Vertex v1 : set1) {
boolean found = false;
Iterator<Vertex> i = set2.iterator();
while (i.hasNext()) {
Vertex v2 = i.next();
if (v1.equalsForTests(v2)) {
i.remove();
found = true;
}
}
if (!found) return false;
}
return set2.isEmpty();
}
private static boolean listEqualsForTests(List<MultiEdge> list1,
List<MultiEdge> list2) {
ListIterator<MultiEdge> i1 = list1.listIterator();
ListIterator<MultiEdge> i2 = list2.listIterator();
while (i1.hasNext() && i2.hasNext()) {
MultiEdge e1 = i1.next();
MultiEdge e2 = i2.next();
if (!(e1 == null ? e2 == null : e1.equalsForTests(e2))) {
return false;
}
}
return !(i1.hasNext() || i2.hasNext());
}
public String debugString() {
StringBuilder builder = new StringBuilder();
for (MultiEdge edge: edges_) {
builder.append(edge.toString() + "\n");
}
builder.append(toJson());
return builder.toString();
}
public void addTargetColumnLabels(Collection<ColumnLabel> columnLabels) {
Preconditions.checkNotNull(columnLabels);
targetColumnLabels_.addAll(columnLabels);
}
public void addTargetColumnLabels(FeTable dstTable) {
Preconditions.checkNotNull(dstTable);
for (String columnName: dstTable.getColumnNames()) {
targetColumnLabels_.add(new ColumnLabel(columnName, dstTable.getTableName()));
}
}
}