blob: 10b6c15c0ce39ce6463ab6c36c96af4aebcaa345 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.math;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.NotNull;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import com.datatorrent.api.Context;
import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
import com.datatorrent.netlet.util.DTThrowable;
/**
* An operator that performs a cartesian product between different elements in a xml document.
* <p>
* The cartesian product is performed between two sets of elements. The elements are specified
* using xpath. The resultant product contains the values of the elements, Multiple
* cartesian products can be specified in a single operator.
*
* The cartesian product sets are specified using the config parameter. The configuration
* specification is as follows:
*
* 1. Product: A cartesian product can be specified as follows
*
* a1,a2:b1,b2,b3
*
* Here a1 denotes the xpath of the first element in the first set,
* b1 denotes the xpath of the first element of second set and so on.
* In the above example a cartesian product is specified between two sets one
* containing 2 elements and the other containing 3 elements. In practice
* any number of elements can be specified in either set. Notice the delimiter ':'
* between the two sets.
*
* The result of the cartesian product is a collection of all the combinations of the values
* of the elements of the two sets taking one from each set at a time. The value of an element is customizable
* and can be specified by extending the operator. For example if the element is a leaf element its text value
* could be used in the value. If we denote the value of an element using the notation v[element], in the above
* example the product would be the collection of the following elements (v[a1] v[b1]), (v[a1] v[b2]),
* (v[a1] v[b3]), (v[a2] v[b1]), (v[a2] v[b2]) and (v[a2] v[b3])
*
* The format of the elements in the product can be customized by implementing the abstract methods when extending
* the operator.
*
* 2. Grouping: Sometimes a subset of the elements need to be grouped together. This operator supports grouping.
* The group is treated as a single element when computing the product and the elements in the group are kept
* together. A group can be specified as follows
*
* a1,a2:(b1,b2),b3
*
* Groups are specified using a single opening and closing brace. In the above example b1 and b2 are grouped
* together and treated as a single element when computing the product. In this case the result of the product would
* be (v[a1] v[b1] v[b2]), (v[a2] v[b1] v[b2]), (v[a1] v[b3]), and (v[a2] v[b3]). Notice that v[b1] and v[b2] are
* not separated and go together. Grouping can be specified in either sets or both.
*
* 3. Section: In some cases there are more than one parent elements with the same tag name in the xml
* document and a product needs to be performed with the children of each parent. This is sectional
* specification and is done as follows
*
* a#(b1,b2:c1,c2)
*
* The parent elements are specified using an absolute xpath, in the above example 'a'. The child elements
* are specified using relative paths from the parent element, The parent and child specification are
* separated by a delimited '#'.
*
* In the above example all the parent elements with xpath 'a' are retrieved and for each element the cartesian
* product is performed on the specified children. In this case for every 'a' element the product collection
* (v[a/b1] v[a/c1]). (v[a/b1] v[a/c2]). (v[a/b2] v[a/c1]). (v[a/b2] v[a/c2]) is computed. All the product
* collections are accumulated into a result collection.
*
* Grouping can also be used inside sectional specification. In the above example the b children can be grouped
* as follows
*
* a#((b1, b2):c1,c2)
*
* 4. Element shortcut: If a element that does not contain a value directly is specified but it has child elements
* that have values then it is automatically substituted with all those child elements that have values traversing
* till the leaf elements. For example if the xml document is
*
* <a>
* <b>
* <c>...</c>
* <d>
* <e>...</e>
* </d>
* </b>
* <f>...</f>
* <g>...</g>
* </a>
*
* and c, e, f and g have values. If the configuration is specified as
*
* b:f,g
*
* the element b is substituted with its children elements that have values namely c and e. The resulting
* configuration is
*
* c,e:f,g
*
* and the product would be (v[c] v[f]), (v[c] v[g]), (v[e] v[f]) and (v[e] v[g])
*
* Grouping can also be specified for a parent element in which case all the children elements of that parent
* element that have values are grouped together.
*
* 5. Multiple products: More than one cartesian product can be specified. They need to be separated using a
* delimiter '|'. Each product is computed independent of each other, An example of this is
*
* a1,a2:b1,b2|c1,c2:d1,d2|e1,e2,e3:f1
*
* @displayName Abstract XML Cartesian Product
* @category Math
* @tags cartesian product, xml, multiple products, dom operator
* @since 1.0.1
*/
public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOperator<T>
{
@NotNull
private String config;
private transient XPath xpath;
private transient PathElementFactory pathElementFactory;
private transient CartesianProductFactory cartesianProductFactory;
protected void processDocument(Document document, T tuple)
{
try {
List<String> result = new ArrayList<String>();
for (CartesianProduct cartesianProduct : cartesianProducts) {
cartesianProduct.product(document, result);
}
processResult(result, tuple);
} catch (XPathExpressionException e) {
DTThrowable.rethrow(e);
}
}
protected abstract void processResult(List<String> result, T tuple);
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
xpath = XPathFactory.newInstance().newXPath();
pathElementFactory = new PathElementFactory();
cartesianProductFactory = new CartesianProductFactory();
parseConfig();
}
public void setConfig(String config)
{
this.config = config;
}
public void parseConfig()
{
String[] strprods = config.split("\\|");
cartesianProducts = new CartesianProduct[strprods.length];
for (int i = 0; i < strprods.length; ++i) {
cartesianProducts[i] = cartesianProductFactory.getSpecable(strprods[i]);
}
}
private interface Specable
{
public void parse(String spec);
}
private interface SpecableFactory<T>
{
public T getSpecable(String spec);
}
public interface PathElement extends Specable
{
public void productRight(Node context, PathElement pathElement, List<String> result) throws XPathExpressionException;
public void productLeft(Node context, String value, List<String> result) throws XPathExpressionException;
public List<Node> getValueNodes(Node node) throws XPathExpressionException;
}
public class SimplePathElement implements PathElement
{
public String path;
public void parse(String spec)
{
path = spec;
}
@Override
public void productRight(Node context, PathElement pathElement, List<String> result) throws XPathExpressionException
{
List<Node> nodes = AbstractXmlCartesianProduct.this.getValueNodes(context, path);
for (Node node : nodes) {
String value = getValue(node);
pathElement.productLeft(context, value, result);
}
}
@Override
public void productLeft(Node context, String left, List<String> result) throws XPathExpressionException
{
List<Node> nodes = AbstractXmlCartesianProduct.this.getValueNodes(context, path);
for (Node node : nodes) {
String value = getValue(node);
String product = product(left, value);
result.add(product);
}
}
@Override
public List<Node> getValueNodes(Node context) throws XPathExpressionException
{
return AbstractXmlCartesianProduct.this.getValueNodes(context, path);
}
}
public class GroupPathElement implements PathElement
{
public boolean unified;
public PathElement[] pathElements;
public void parse(String spec)
{
String estr = spec;
if (spec.length() >= 2) {
// Check if it is a unified element, it can have nested elements inside
if (spec.charAt(0) == '(') {
int balance = 1;
int i;
for (i = 1; (i < spec.length()) && (balance > 0); ++i) {
if (spec.charAt(i) == ')') {
balance--;
} else if (spec.charAt(i) == '(') {
balance++;
}
}
if (i == spec.length()) {
estr = spec.substring(1, spec.length() - 1);
unified = true;
}
}
}
String[] selements = estr.split(",");
pathElements = new PathElement[selements.length];
for (int i = 0; i < selements.length; ++i) {
pathElements[i] = pathElementFactory.getSpecable(selements[i]);
}
}
@Override
public void productRight(Node context, PathElement pathElement, List<String> result) throws XPathExpressionException
{
if (!unified) {
for (PathElement ePathElement : pathElements) {
ePathElement.productRight(context, pathElement, result);
}
} else {
List<Node> nodes = getValueNodes(context);
String value = getValue(nodes);
pathElement.productLeft(context, value, result);
}
}
@Override
public void productLeft(Node context, String value, List<String> result) throws XPathExpressionException
{
if (!unified) {
for (PathElement pathElement : pathElements) {
pathElement.productLeft(context, value, result);
}
} else {
List<Node> nodes = getValueNodes(context);
String evalue = getValue(nodes);
String product = product(value, evalue);
result.add(product);
}
}
@Override
public List<Node> getValueNodes(Node context) throws XPathExpressionException
{
List<Node> nodes = new ArrayList<Node>();
for (PathElement pathElement : pathElements) {
nodes.addAll(pathElement.getValueNodes(context));
}
return nodes;
}
}
private interface CartesianProduct extends Specable
{
public void parse(String productSpec);
public void product(Node context, List<String> result) throws XPathExpressionException;
}
public class RegularCartesianProduct implements CartesianProduct
{
public PathElement element1;
public PathElement element2;
@Override
public void parse(String spec)
{
String[] elements = spec.split("\\:");
if (elements.length == 2) {
element1 = pathElementFactory.getSpecable(elements[0]);
element2 = pathElementFactory.getSpecable(elements[1]);
}
}
@Override
public void product(Node context, List<String> result) throws XPathExpressionException
{
element1.productRight(context, element2, result);
}
}
public class SelectionCartesianProduct implements CartesianProduct
{
public SimplePathElement parentElement;
public PathElement childElement1;
public PathElement childElement2;
@Override
public void parse(String productSpec)
{
int seltnDelIdx = productSpec.indexOf("#");
if (seltnDelIdx != -1) {
String parentSpec = productSpec.substring(0, seltnDelIdx);
PathElement pathElement = pathElementFactory.getSpecable(parentSpec);
if (SimplePathElement.class.isAssignableFrom(pathElement.getClass())) {
if (productSpec.length() > (seltnDelIdx + 3)) {
int chldStDelIdx = seltnDelIdx + 1;
int chldEdDelIdx = productSpec.length() - 1;
int chldSepDelIdx;
if ((productSpec.charAt(chldStDelIdx) == '(') && (productSpec.charAt(chldEdDelIdx) == ')')
&& ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
String child1Spec = productSpec.substring(chldStDelIdx + 1, chldSepDelIdx);
String child2Spec = productSpec.substring(chldSepDelIdx + 1, chldEdDelIdx);
parentElement = (SimplePathElement)pathElement;
childElement1 = pathElementFactory.getSpecable(child1Spec);
childElement2 = pathElementFactory.getSpecable(child2Spec);
}
}
}
}
}
@Override
public void product(Node context, List<String> result) throws XPathExpressionException
{
NodeList nodes = getNodes(context, parentElement.path);
for (int i = 0; i < nodes.getLength(); ++i) {
childElement1.productRight(nodes.item(i), childElement2, result);
}
}
}
private class PathElementFactory implements SpecableFactory<PathElement>
{
@Override
public PathElement getSpecable(String spec)
{
PathElement pathElement = null;
if (spec.matches("[^,(]*")) {
pathElement = new SimplePathElement();
} else {
pathElement = new GroupPathElement();
}
pathElement.parse(spec);
return pathElement;
}
}
private class CartesianProductFactory implements SpecableFactory<CartesianProduct>
{
@Override
public CartesianProduct getSpecable(String spec)
{
CartesianProduct product = null;
if (spec.indexOf("#") == -1) {
product = new RegularCartesianProduct();
} else {
product = new SelectionCartesianProduct();
}
if (product != null) {
product.parse(spec);
}
return product;
}
}
private List<Node> getNodes(Document document, String path) throws XPathExpressionException
{
XPathExpression pathExpr = xpath.compile(path);
NodeList nodeList = (NodeList)pathExpr.evaluate(document, XPathConstants.NODESET);
List<Node> nodes = new ArrayList<Node>();
for (int i = 0; i < nodeList.getLength(); ++i) {
nodes.add(nodeList.item(i));
}
return nodes;
}
protected List<Node> getValueNodes(Node node, String path) throws XPathExpressionException
{
NodeList nodeList = getNodes(node, path);
List<Node> nodes = new ArrayList<Node>();
getValueNodes(nodeList, nodes);
return nodes;
}
private NodeList getNodes(Node node, String path) throws XPathExpressionException
{
XPathExpression pathExpr = xpath.compile(path);
return (NodeList)pathExpr.evaluate(node, XPathConstants.NODESET);
}
protected void getValueNodes(NodeList nodes, List<Node> textNodes)
{
for (int i = 0; i < nodes.getLength(); ++i) {
Node node = nodes.item(i);
if (isValueNode(node)) {
textNodes.add(node);
} else {
getValueNodes(node.getChildNodes(), textNodes);
}
}
}
public String getValue(List<Node> nodes)
{
StringBuilder sb = new StringBuilder();
String delim = getDelim();
boolean first = true;
for (Node node : nodes) {
if (!first) {
sb.append(delim);
} else {
first = false;
}
sb.append(getValue(node));
}
return sb.toString();
}
public String getDelim()
{
return ",";
}
public String product(String left, String right)
{
StringBuilder sb = new StringBuilder();
sb.append(left).append(getDelim()).append(right);
return sb.toString();
}
protected abstract boolean isValueNode(Node node);
protected abstract String getValue(Node node);
private transient CartesianProduct[] cartesianProducts;
}