blob: 230e0ba0c0f3db5c77e87f47965ef14de3333e9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pirk.schema.query;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
/**
* Class to load any query schemas specified in the properties file, 'query.schemas'
* <p>
* Schemas should be specified as follows:
*
* <pre>
* {@code
* <schema>
* <schemaName> name of the schema </schemaName>
* <dataSchemaName> name of the data schema over which this query is run </dataSchemaName>
* <selectorName> name of the element in the data schema that will be the selector </selectorName>
* <elements>
* <name> element name of element in the data schema to include in the query response; just
* as with the data schema, the element name is case sensitive</name>
* </elements>
* <filter> (optional) name of the filter class to use to filter the data </filter>
* <filterNames> (optional)
* <name> element name of element in the data schema to apply pre-processing filters </name>
* </filterNames>
* <additional> (optional) additional fields for the query schema, in <key,value> pairs
* <field>
* <key> key corresponding the the field </key>
* <value> value corresponding to the field </value>
* </field>
* </additional>
* </schema>
* }
* </pre>
* <p>
* TODO: Allow the schema to specify how many array elements to return per element, if the element is an array type
*/
public class QuerySchemaLoader
{
private static final Logger logger = LoggerFactory.getLogger(QuerySchemaLoader.class);
static
{
logger.info("Loading query schemas: ");
try
{
initialize();
} catch (PIRException e)
{
logger.error(e.getLocalizedMessage());
}
}
/* Kept for compatibility */
/**
* Initializes the static {@link QuerySchemaRegistry} with a list of query schema names.
*
* @throws PIRException
* - failed to initialize
*/
public static void initialize() throws PIRException
{
initialize(false, null);
}
/* Kept for compatibility */
/**
* Initializes the static {@link QuerySchemaRegistry} with a list of available query schema names.
*
* @param hdfs
* If true, specifies that the query schema is an hdfs file; if false, that it is a regular file.
* @param fs
* Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the query schema exists
* @throws PIRException
* - failed to initialize the query schemas because they could not be read or are invalid.
*/
public static void initialize(boolean hdfs, FileSystem fs) throws PIRException
{
String querySchemas = SystemConfiguration.getProperty("query.schemas", "none");
if (querySchemas.equals("none"))
{
logger.info("query.schemas = none");
return;
}
String[] querySchemaFiles = querySchemas.split(",");
try
{
for (String schemaFile : querySchemaFiles)
{
QuerySchema querySchema = readSchemaFile(schemaFile, fs, hdfs);
QuerySchemaRegistry.put(querySchema);
}
} catch (IOException e)
{
throw new PIRException("Error reading query schema", e);
}
}
private static QuerySchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException
{
logger.info("Loading query schemaFile = " + schemaFile);
// Parse and load the schema file into a QuerySchema object.
QuerySchemaLoader loader = new QuerySchemaLoader();
InputStream is;
if (hdfs)
{
logger.info("hdfs: filePath = " + schemaFile);
is = fs.open(new Path(schemaFile));
}
else
{
logger.info("localFS: inputFile = " + schemaFile);
is = new FileInputStream(schemaFile);
}
try
{
return loader.loadSchema(is);
} finally
{
is.close();
}
}
/**
* Default constructor.
*/
public QuerySchemaLoader()
{
}
/**
* Returns the query schema as defined in XML format on the given stream.
*
* @param stream
* The source of the XML query schema description.
* @return The query schema.
* @throws IOException
* A problem occurred reading from the given stream.
* @throws PIRException
* The schema description is invalid.
*/
public QuerySchema loadSchema(InputStream stream) throws IOException, PIRException
{
// Read in and parse the XML file.
Document doc = parseXMLDocument(stream);
// Used to build the final schema.
QuerySchemaBuilder schemaBuilder = new QuerySchemaBuilder();
// Extract the schemaName.
String schemaName = extractValue(doc, "schemaName");
schemaBuilder.setName(schemaName);
logger.info("schemaName = " + schemaName);
// Extract the dataSchemaName.
String dataSchemaName = extractValue(doc, "dataSchemaName");
schemaBuilder.setDataSchemaName(dataSchemaName);
logger.info("dataSchemaName = " + dataSchemaName);
// Extract the selectorName.
String selectorName = extractValue(doc, "selectorName");
schemaBuilder.setSelectorName(selectorName);
logger.info("selectorName = " + selectorName);
// Extract the query elements.
NodeList elementsList = doc.getElementsByTagName("elements");
if (elementsList.getLength() != 1)
{
throw new PIRException("elementsList.getLength() = " + elementsList.getLength() + " -- should be 1");
}
Element elements = (Element) elementsList.item(0);
LinkedHashSet<String> elementNames = new LinkedHashSet<>();
NodeList nList = elements.getElementsByTagName("name");
for (int i = 0; i < nList.getLength(); i++)
{
Node nNode = nList.item(i);
if (nNode.getNodeType() == Node.ELEMENT_NODE)
{
elementNames.add(nNode.getFirstChild().getNodeValue().trim());
}
}
schemaBuilder.setQueryElementNames(elementNames);
// Extract the filter, if it exists
if (doc.getElementsByTagName("filter").item(0) != null)
{
schemaBuilder.setFilterTypeName(doc.getElementsByTagName("filter").item(0).getTextContent().trim());
}
// Create a filter over the query elements.
schemaBuilder.setFilteredElementNames(extractFilteredElementNames(doc));
// Extract the additional fields, if they exists
Map<String,String> additionalFields = new HashMap<>();
if (doc.getElementsByTagName("additional").item(0) != null)
{
NodeList fieldList = doc.getElementsByTagName("field");
int numFields = fieldList.getLength();
if (numFields == 0)
{
throw new PIRException("numFields = " + numFields + " -- should be at least one");
}
for (int i = 0; i < numFields; ++i)
{
Element fields = (Element) fieldList.item(i);
NodeList kv = fields.getChildNodes();
additionalFields.put(getNodeValue("key", kv), getNodeValue("value", kv));
}
}
schemaBuilder.setAdditionalFields(additionalFields);
// Create and return the query schema object.
return schemaBuilder.build();
}
/**
* Parses and normalizes the XML document available on the given stream.
*
* @param stream
* The input stream.
* @return A Document representing the XML document.
* @throws IOException
* - failed to read input
* @throws PIRException
* - file could not be parsed
*/
private Document parseXMLDocument(InputStream stream) throws IOException, PIRException
{
Document doc;
try
{
DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
doc = dBuilder.parse(stream);
} catch (ParserConfigurationException | SAXException e)
{
throw new PIRException("Schema parsing error", e);
}
doc.getDocumentElement().normalize();
logger.info("Root element: " + doc.getDocumentElement().getNodeName());
return doc;
}
/**
* Returns the possibly empty set of element names over which the filter is applied, maintaining document order.
*
* @param doc
* An XML document specifying names upon which we will filter the query.
* @return The set of names upon which we will filter the query.
* @throws PIRException
* - Filter lists not found
*/
private Set<String> extractFilteredElementNames(Document doc) throws PIRException
{
Set<String> filteredNamesSet = new HashSet<>();
NodeList filterNamesList = doc.getElementsByTagName("filterNames");
if (filterNamesList.getLength() != 0)
{
if (filterNamesList.getLength() > 1)
{
throw new PIRException("filterNamesList.getLength() = " + filterNamesList.getLength() + " -- should be 0 or 1");
}
// Extract element names from the list.
NodeList filterNList = ((Element) filterNamesList.item(0)).getElementsByTagName("name");
for (int i = 0; i < filterNList.getLength(); i++)
{
Node nNode = filterNList.item(i);
if (nNode.getNodeType() == Node.ELEMENT_NODE)
{
// Pull the name and add to the set.
String name = nNode.getFirstChild().getNodeValue().trim();
filteredNamesSet.add(name);
logger.info("filterName = " + name);
}
}
}
return filteredNamesSet;
}
/**
* Extracts a top level, single value from the XML structure.
*
* Throws an exception if there is not exactly one tag with the given name.
*
* @param doc
* The XML document from which we extract data
* @param tagName
* The name of the tag we wish to extract from the {@code doc}
* @return The text content of the tag.
* @throws PIRException
* - XML Document is Empty
*/
private String extractValue(Document doc, String tagName) throws PIRException
{
NodeList itemList = doc.getElementsByTagName(tagName);
if (itemList.getLength() != 1)
{
throw new PIRException("itemList.getLength() = " + itemList.getLength() + " -- should be 1");
}
return itemList.item(0).getTextContent().trim();
}
/**
* Extracts the value corresponding to a given tag from the XML nodeList
*
* @param tagName
* The name of the tag for which to extract the value
* @param nodes
* The NodeList
* @return The given value
*/
private String getNodeValue(String tagName, NodeList nodes)
{
String value = "";
for (int x = 0; x < nodes.getLength(); x++)
{
Node node = nodes.item(x);
if (node.getNodeName().equals(tagName))
{
value = node.getChildNodes().item(0).getNodeValue().trim();
}
}
return value;
}
}