Merge branch 'master' into steven/hdfs
diff --git a/pom.xml b/pom.xml
index 8a97a92..4cabe8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -295,6 +296,25 @@
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-2.x</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
+ <type>jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+
+ <dependency>
<groupId>ant</groupId>
<artifactId>ant-trax</artifactId>
<version>1.6.5</version>
@@ -340,7 +360,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
- <version>1.3.2</version>
+ <version>2.4</version>
</dependency>
<dependency>
@@ -359,9 +379,10 @@
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
- <version>6.1.4</version>
+ <version>6.1.22</version>
<scope>compile</scope>
</dependency>
+
</dependencies>
</dependencyManagement>
@@ -437,19 +458,21 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
- <!-- We override the configuration plugin to override the descriptor to use for building
- the source release zip. Specifically, we would like to control the inclusions/exclusions.
- For example, we exclude the KEYS file from the zip -->
+ <!-- We override the configuration plugin to override the descriptor
+ to use for building the source release zip. Specifically, we would like to
+ control the inclusions/exclusions. For example, we exclude the KEYS file
+ from the zip -->
<executions>
<execution>
- <!-- Use this id to match the id mentioned in the assembly plugin configuration in
- the apache parent POM under the apache-release profile -->
+ <!-- Use this id to match the id mentioned in the assembly plugin configuration
+ in the apache parent POM under the apache-release profile -->
<id>source-release-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
- <!-- combine.self should be override to replace the configuration in the parent POM -->
+ <!-- combine.self should be override to replace the configuration in
+ the parent POM -->
<configuration combine.self="override">
<runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
<descriptors>
@@ -491,6 +514,29 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>com.google.code.maven-replacer-plugin</groupId>
+ <artifactId>replacer</artifactId>
+ <version>1.5.3</version>
+ <executions>
+ <execution>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>replace</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <ignoreMissingFile>true</ignoreMissingFile>
+ <file>vxquery-server/src/main/resources/conf/cluster.properties</file>
+ <outputFile>
+ vxquery-server/src/main/resources/conf/cluster.properties
+ </outputFile>
+ <regex>false</regex>
+ <token>$CONF_PATH$</token>
+ <value>${basedir}/vxquery-xtest/src/test/resources/hadoop/conf</value>
+ </configuration>
+ </plugin>
</plugins>
</build>
@@ -541,6 +587,7 @@
<maxmemory>2g</maxmemory>
</configuration>
</plugin>
+
</plugins>
</reporting>
diff --git a/src/site/apt/user_query.apt b/src/site/apt/user_query.apt
index 8ea6429..c5132c3 100644
--- a/src/site/apt/user_query.apt
+++ b/src/site/apt/user_query.apt
@@ -49,6 +49,7 @@
-showrp : Show Runtime plan
-showtet : Show translated expression tree
-timing : Produce timing information
+-hdfs-conf VAL : The folder containing the HDFS configuration files
----------------------------------------
* Java Options
diff --git a/src/site/apt/user_query_hdfs.apt b/src/site/apt/user_query_hdfs.apt
new file mode 100644
index 0000000..fa736b8
--- /dev/null
+++ b/src/site/apt/user_query_hdfs.apt
@@ -0,0 +1,180 @@
+~~ Licensed to the Apache Software Foundation (ASF) under one or more
+~~ contributor license agreements. See the NOTICE file distributed with
+~~ this work for additional information regarding copyright ownership.
+~~ The ASF licenses this file to You under the Apache License, Version 2.0
+~~ (the "License"); you may not use this file except in compliance with
+~~ the License. You may obtain a copy of the License at
+~~
+~~ http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License.
+
+Executing a Query in HDFS
+
+
+* 1. Connecting VXQuery with HDFS
+
+ In order to read HDFS data, VXQuery needs access to the HDFS configuration
+ directory, which contains:
+
+ core-site.xml
+ hdfs-site.xml
+ mapred-site.xml
+
+ Some systems may automatically set this directory as a system environment
+ variable ("HADOOP_CONF_DIR"). If this is the case, VXQuery will retrieve
+ this automatically when attempting to perform HDFS queries.
+
+ When this variable is not set, users will need to provide this directory as
+ a Command Line Option when executing VXQuery:
+ -hdfs-conf /path/to/hdfs/conf_folder
+
+
+* 2. Running the Query
+
+
+ For files stored in HDFS there are 2 ways to access them from VXQuery.
+
+
+ [[a]] Reading them as whole files.
+
+
+ [[b]] Reading them block by block.
+
+
+** a. Reading them as whole files.
+
+ For this option you only need to change the path to files. To define that your
+ file(s) exist and should be read from HDFS you must add <"hdfs:/"> in front
+ of the path. VXQuery will read the path of the files you request in your query
+ and try to locate them.
+
+
+ So in order to run a query that will read the input files from HDFS you need
+ to make sure that
+
+
+ a) The environmental variable is set for "HADOOP_CONF_DIR" or you pass the
+ directory location using -hdfs-conf
+
+
+ b) The path defined in your query begins with <hdfs://> and the full path to
+ the file(s).
+
+
+ c) The path exists on HDFS and the user that runs the query has read permission
+ to these files.
+
+
+*** Example
+
+ I want to find all the <books> that are published after 2004.
+
+
+ The file is located in HDFS in this path </user/hduser/store/books.xml>
+
+
+ My query will look like this:
+
+
+----------
+for $x in collection("hdfs://user/hduser/store")
+where $x/year>2004
+return $x/title
+----------
+
+
+ If I want only one file, the <<books.xml>> to be parsed from HDFS, my query will
+ look like this:
+
+
+----------
+for $x in doc("hdfs://user/hduser/store/books.xml")
+where $x/year>2004
+return $x/title
+----------
+
+
+** b. Reading them block by block
+
+
+ In order to use that option you need to modify your query. Instead of using the
+ <collection> or <doc> function to define your input file(s) you need to use
+ <collection-with-tag>.
+
+
+ <collection-with-tag> accepts two arguments, one is the path to the HDFS directory
+ you have stored your input files, and the second is a specific <<tag>> that exists
+ in the input file(s). This is the tag of the element that contains the fields that
+ your query is looking for.
+
+ Other than these arguments, you do not need to change anything else in the query.
+
+ Note: since this strategy is optimized to read block by block, the result will
+ include all elements with the given tag, regardless of depth within the xml tree.
+
+
+*** Example
+
+ The same example, using <<collection-with-tag>>.
+
+ My input file <books.xml>:
+
+-----------------------------
+<?xml version="1.0" encoding="UTF-8"?>
+<bookstore>
+
+<book>
+ <title lang="en">Everyday Italian</title>
+ <author>Giada De Laurentiis</author>
+ <year>2005</year>
+ <price>30.00</price>
+</book>
+
+<book>
+ <title lang="en">Harry Potter</title>
+ <author>J K. Rowling</author>
+ <year>2005</year>
+ <price>29.99</price>
+</book>
+
+<book>
+ <title lang="en">XQuery Kick Start</title>
+ <author>James McGovern</author>
+ <author>Per Bothner</author>
+ <author>Kurt Cagle</author>
+ <author>James Linn</author>
+ <author>Vaidyanathan Nagarajan</author>
+ <year>2003</year>
+ <price>49.99</price>
+</book>
+
+<book>
+ <title lang="en">Learning XML</title>
+ <author>Erik T. Ray</author>
+ <year>2003</year>
+ <price>39.95</price>
+</book>
+
+</bookstore>
+----------------------------
+
+
+ My query will look like this:
+
+
+----------------------------
+for $x in collection-with-tag("hdfs://user/hduser/store","book")/book
+where $x/year>2004
+return $x/title
+----------------------------
+
+
+ Take notice that I defined the path to the directory containing the file(s)
+ and not the file, <collection-with-tag> expects the path to the directory. I also
+ added the </book> after the function. This is also needed, like <collection> and
+ <doc> functions, for the query to be parsed correctly.
diff --git a/src/site/site.xml b/src/site/site.xml
index 5640976..d64fe0d 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -64,6 +64,9 @@
name="Executing a Query"
href="user_query.html" />
<item
+ name="Using HDFS with VXQuery"
+ href="user_query_hdfs.html" />
+ <item
name="Running the Test Suite"
href="user_running_tests.html" />
</menu>
diff --git a/vxquery-cli/pom.xml b/vxquery-cli/pom.xml
index 7bf1ff9..c4e11cb 100644
--- a/vxquery-cli/pom.xml
+++ b/vxquery-cli/pom.xml
@@ -126,7 +126,19 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-2.x</artifactId>
+ </dependency>
</dependencies>
+
+
<reporting>
<plugins>
diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
index a02c65d..17287c6 100644
--- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
+++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
@@ -81,7 +81,8 @@
/**
* Constructor to use command line options passed.
*
- * @param opts Command line options object
+ * @param opts
+ * Command line options object
*/
public VXQuery(CmdLineOptions opts) {
this.opts = opts;
@@ -181,8 +182,13 @@
opts.showOET, opts.showRP);
start = opts.timing ? new Date() : null;
- XMLQueryCompiler compiler = new XMLQueryCompiler(listener, getNodeList(), opts.frameSize,
- opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize);
+
+ Map<String, NodeControllerInfo> nodeControllerInfos = null;
+ if (hcc != null) {
+ nodeControllerInfos = hcc.getNodeControllerInfos();
+ }
+ XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize,
+ opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf);
resultSetId = createResultSetId();
CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
resultSetId, null);
@@ -233,30 +239,13 @@
}
/**
- * Get cluster node configuration.
- *
- * @return Configuration of node controllers as array of Strings.
- * @throws Exception
- */
- private String[] getNodeList() throws Exception {
- if (hcc != null) {
- Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos();
- String[] nodeList = new String[nodeControllerInfos.size()];
- int index = 0;
- for (String node : nodeControllerInfos.keySet()) {
- nodeList[index++] = node;
- }
- return nodeList;
- }
- return new String[0];
- }
-
- /**
* Creates a Hyracks dataset, if not already existing with the job frame size, and 1 reader. Allocates a new buffer of size specified in the frame of Hyracks
* node. Creates new dataset reader with the current job ID and result set ID. Outputs the string in buffer for each frame.
*
- * @param spec JobSpecification object, containing frame size. Current specified job.
- * @param writer Writer for output of job.
+ * @param spec
+ * JobSpecification object, containing frame size. Current specified job.
+ * @param writer
+ * Writer for output of job.
* @throws Exception
*/
private void runJob(JobSpecification spec, PrintWriter writer) throws Exception {
@@ -339,7 +328,8 @@
/**
* Reads the contents of file given in query into a String. The file is always closed. For XML files UTF-8 encoding is used.
*
- * @param query The query with filename to be processed
+ * @param query
+ * The query with filename to be processed
* @return UTF-8 formatted query string
* @throws IOException
*/
@@ -361,8 +351,7 @@
* Helper class with fields and methods to handle all command line options
*/
private static class CmdLineOptions {
- @Option(name = "-available-processors",
- usage = "Number of available processors. (default: java's available processors)")
+ @Option(name = "-available-processors", usage = "Number of available processors. (default: java's available processors)")
private int availableProcessors = -1;
@Option(name = "-client-net-ip-address", usage = "IP Address of the ClusterController.")
@@ -422,6 +411,9 @@
@Option(name = "-x", usage = "Bind an external variable")
private Map<String, String> bindings = new HashMap<String, String>();
+ @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files")
+ private String hdfsConf = null;
+
@Argument
private List<String> arguments = new ArrayList<String>();
}
diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml
index 59cc987..d244818 100644
--- a/vxquery-core/pom.xml
+++ b/vxquery-core/pom.xml
@@ -14,7 +14,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -63,7 +64,8 @@
<configuration>
<target>
<ant antfile="build-xslt.xml" target="build">
- <property name="build.gen-src.dir" value="${project.build.directory}/generated-sources/main/java" />
+ <property name="build.gen-src.dir"
+ value="${project.build.directory}/generated-sources/main/java" />
<property name="src.code.dir" value="${basedir}/src/main/java" />
<property name="xslt.dir" value="${basedir}/src/main/xslt" />
<property name="classpath.xslt" refid="maven.compile.classpath" />
@@ -80,7 +82,8 @@
<configuration>
<target>
<ant antfile="build-site.xml" target="build">
- <property name="build.gen-site.dir" value="${project.build.directory}/generated-site/apt" />
+ <property name="build.gen-site.dir"
+ value="${project.build.directory}/generated-site/apt" />
<property name="src.code.dir" value="${basedir}/src/main/java" />
<property name="xslt.dir" value="${basedir}/src/main/xslt" />
<property name="classpath.xslt" refid="maven.compile.classpath" />
@@ -121,16 +124,13 @@
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
- <!--
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- </plugin>
- -->
+ <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-site-plugin</artifactId>
+ </plugin> -->
</plugins>
<pluginManagement>
<plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <!--This plugin's configuration is used to store Eclipse m2e settings
+ only. It has no influence on the Maven build itself. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
@@ -209,6 +209,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-2.x</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>ant</groupId>
<artifactId>ant-trax</artifactId>
<scope>provided</scope>
@@ -259,6 +269,21 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
</dependencies>
<reporting>
@@ -290,18 +315,10 @@
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration>
</plugin>
- <!--
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <version>2.5.2</version>
- <configuration>
- <effort>Min</effort>
- <threshold>Normal</threshold>
- <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile>
- </configuration>
- </plugin>
- -->
+ <!-- <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.2</version> <configuration> <effort>Min</effort> <threshold>Normal</threshold>
+ <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile> </configuration>
+ </plugin> -->
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
index fd20465..205e0b2 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
@@ -19,8 +19,8 @@
import java.util.LinkedList;
import java.util.List;
-import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild;
import org.apache.vxquery.compiler.rewriter.rules.ConsolidateAssignAggregateRule;
+import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild;
import org.apache.vxquery.compiler.rewriter.rules.ConvertAssignToUnnestRule;
import org.apache.vxquery.compiler.rewriter.rules.ConvertFromAlgebricksExpressionsRule;
import org.apache.vxquery.compiler.rewriter.rules.ConvertToAlgebricksExpressionsRule;
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
index 53011d2..74220da 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
@@ -22,16 +22,6 @@
import java.util.Arrays;
import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
-import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.values.ValueTag;
-import org.apache.vxquery.functions.BuiltinFunctions;
-import org.apache.vxquery.types.BuiltinTypeRegistry;
-import org.apache.vxquery.types.Quantifier;
-import org.apache.vxquery.types.SequenceType;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -46,6 +36,14 @@
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
+import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.types.BuiltinTypeRegistry;
+import org.apache.vxquery.types.Quantifier;
+import org.apache.vxquery.types.SequenceType;
public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
@@ -54,14 +52,13 @@
final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
/**
- * Get the constant value for the collection. Return null for not a collection.
+ * Get the arguments for the collection and collection-with-tag. Return null for not a collection.
*
* @param opRef
* Logical operator
* @return collection name
*/
- protected String getCollectionName(Mutable<ILogicalOperator> opRef) {
- VXQueryConstantValue constantValue;
+ protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
@@ -82,13 +79,32 @@
return null;
}
AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
- if (!functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) {
+ if (!functionCall.getFunctionIdentifier()
+ .equals(BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier())
+ && !functionCall.getFunctionIdentifier()
+ .equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) {
return null;
}
- ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(0).getValue();
+ // Get arguments
+ int size = functionCall.getArguments().size();
+ if (size > 0) {
+ String args[] = new String[size];
+ for (int i = 0; i < size; i++) {
+ args[i] = getArgument(functionCall, opRef, i);
+ }
+ return args;
+ }
+ return null;
+ }
+
+ private String getArgument(AbstractFunctionCallExpression functionCall, Mutable<ILogicalOperator> opRef, int pos) {
+ VXQueryConstantValue constantValue;
+ ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(pos).getValue();
if (logicalExpression2.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
return null;
+ } else if (logicalExpression2 == null) {
+ return null;
}
VariableReferenceExpression vre = (VariableReferenceExpression) logicalExpression2;
Mutable<ILogicalOperator> opRef3 = OperatorToolbox.findProducerOf(opRef, vre.getVariableReference());
@@ -111,7 +127,6 @@
} else {
return null;
}
-
// Constant value is now in a TaggedValuePointable. Convert the value into a java String.
tvp.set(constantValue.getValue(), 0, constantValue.getValue().length);
String collectionName = null;
@@ -121,11 +136,12 @@
bbis.setByteBuffer(ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(), stringp.getStartOffset(),
stringp.getLength() + stringp.getStartOffset())), 0);
collectionName = di.readUTF();
+ return collectionName;
} catch (IOException e) {
e.printStackTrace();
}
}
- return collectionName;
+ return null;
}
@Override
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
index cc857a1..8ed8bb1 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
@@ -64,9 +64,10 @@
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context;
- String collectionName = getCollectionName(opRef);
+ String args[] = getCollectionName(opRef);
- if (collectionName != null) {
+ if (args != null) {
+ String collectionName = args[0];
// Build the new operator and update the query plan.
int collectionId = vxqueryContext.newCollectionId();
VXQueryCollectionDataSource ds = VXQueryCollectionDataSource.create(collectionId, collectionName,
@@ -74,6 +75,12 @@
if (ds != null) {
ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
+ // Check if the call is for collection-with-tag
+ if (args.length == 2) {
+ ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
+ ds.setTag(args[1]);
+ }
+
// Known to be true because of collection name.
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
UnnestOperator unnest = (UnnestOperator) op;
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index 38f03a4..3b9371d 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -128,6 +128,14 @@
<!-- Collection operator is added during the rewrite rules phase. -->
</function>
+ <!-- fn:collection-with-tag($arg1 as xs:string?, $arg2 as xs:string?) as node()* -->
+ <function name="fn:collection-with-tag">
+ <param name="arg1" type="xs:string?"/>
+ <param name="arg2" type="xs:string?"/>
+ <return type="node()*"/>
+ <!-- CollectionWithTag operator is added during the rewrite rules phase. -->
+ </function>
+
<!-- fn:compare($comparand1 as xs:string?, $comparand2 as xs:string?) as xs:integer? -->
<function name="fn:compare">
<param name="comparand1" type="xs:string?"/>
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
new file mode 100644
index 0000000..dcbfe94
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+public class HDFSFunctions {
+
+ private Configuration conf;
+ private FileSystem fs;
+ private String conf_path;
+ private Job job;
+ private InputFormat inputFormat;
+ private List<InputSplit> splits;
+ private ArrayList<ArrayList<String>> nodes;
+ private HashMap<Integer, String> schedule;
+ private final String TEMP = "java.io.tmpdir";
+ private final String dfs_path = "vxquery_splits_schedule.txt";
+ private final String filepath = System.getProperty(TEMP) + "splits_schedule.txt";
+ protected static final Logger LOGGER = Logger.getLogger(HDFSFunctions.class.getName());
+ private final Map<String, NodeControllerInfo> nodeControllerInfos;
+
+ /**
+ * Create the configuration and add the paths for core-site and hdfs-site as resources.
+ * Initialize an instance of HDFS FileSystem for this configuration.
+ *
+ * @param nodeControllerInfos
+ * @param hdfsConf
+ */
+ public HDFSFunctions(Map<String, NodeControllerInfo> nodeControllerInfos, String hdfsConf) {
+ this.conf = new Configuration();
+ this.nodeControllerInfos = nodeControllerInfos;
+ this.conf_path = hdfsConf;
+ }
+
+ /**
+ * Create the needed objects for reading the splits of the filepath given as argument.
+ * This method should run before the scheduleSplits method.
+ *
+ * @param filepath
+ */
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void setJob(String filepath, String tag) {
+ try {
+ conf.set("start_tag", "<" + tag + ">");
+ conf.set("end_tag", "</" + tag + ">");
+ job = new Job(conf, "Read from HDFS");
+ Path input = new Path(filepath);
+ FileInputFormat.addInputPath(job, input);
+ job.setInputFormatClass(XmlCollectionWithTagInputFormat.class);
+ inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+ splits = inputFormat.getSplits(job);
+ } catch (IOException | ClassNotFoundException | InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function.
+ * Searches in subdirectories of the home directory too.
+ *
+ * @param filename
+ * @return
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ public boolean isLocatedInHDFS(String filename) throws IllegalArgumentException, IOException {
+ //search file path
+ if (fs.exists(new Path(filename))) {
+ return true;
+ }
+ return searchInDirectory(fs.getHomeDirectory(), filename) != null;
+ }
+
+ /**
+ * Searches the given directory for the file.
+ *
+ * @param directory
+ * to search
+ * @param filename
+ * of file we want
+ * @return path if file exists in this directory.else return null.
+ */
+ public Path searchInDirectory(Path directory, String filename) {
+ //Search the files and folder in this Path to find the one matching the filename.
+ try {
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
+ String[] parts;
+ Path path;
+ while (it.hasNext()) {
+ path = it.next().getPath();
+ parts = path.toString().split("/");
+ if (parts[parts.length - 1].equals(filename)) {
+ return path;
+ }
+ }
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Read the cluster properties file and locate the HDFS_CONF variable that is the directory path for the
+ * hdfs configuration if the system environment variable HDFS_CONF is not set.
+ *
+ * @return true if is successfully finds the Hadoop/HDFS home directory
+ */
+ private boolean locateConf() {
+ if (this.conf_path == null) {
+ //As a last resort, try getting the configuration from the system environment
+ //Some systems won't have this set.
+ this.conf_path = System.getenv("HADOOP_CONF_DIR");
+ }
+ return this.conf_path != null;
+ }
+
+ /**
+ * Upload a file/directory to HDFS.Filepath is the path in the local file system.dir is the destination path.
+ *
+ * @param filepath
+ * @param dir
+ * @return
+ */
+ public boolean put(String filepath, String dir) {
+ if (this.fs != null) {
+ Path path = new Path(filepath);
+ Path dest = new Path(dir);
+ try {
+ if (fs.exists(dest)) {
+ fs.delete(dest, true); //recursive delete
+ }
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ try {
+ fs.copyFromLocalFile(path, dest);
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get instance of the HDFSfile system if it is configured correctly.
+ * Return null if there is no instance.
+ *
+ * @return
+ */
+ public FileSystem getFileSystem() {
+ if (locateConf()) {
+ conf.addResource(new Path(this.conf_path + "/core-site.xml"));
+ conf.addResource(new Path(this.conf_path + "/hdfs-site.xml"));
+ try {
+ fs = FileSystem.get(conf);
+ return this.fs;
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Could not locate HDFS configuration folder.");
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Create a HashMap that has as key the hostname and values the splits that belong to this hostname;
+ *
+ * @return
+ * @throws IOException
+ */
+ public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws IOException {
+ HashMap<String, ArrayList<Integer>> splits_map = new HashMap<String, ArrayList<Integer>>();
+ ArrayList<Integer> temp;
+ int i = 0;
+ String hostname;
+ for (InputSplit s : this.splits) {
+ SplitLocationInfo info[] = s.getLocationInfo();
+ hostname = info[0].getLocation();
+ if (splits_map.containsKey(hostname)) {
+ temp = splits_map.get(hostname);
+ temp.add(i);
+ } else {
+ temp = new ArrayList<Integer>();
+ temp.add(i);
+ splits_map.put(hostname, temp);
+ }
+ i++;
+ }
+
+ return splits_map;
+ }
+
+ public void scheduleSplits() throws IOException, ParserConfigurationException, SAXException {
+ schedule = new HashMap<Integer, String>();
+ ArrayList<String> empty = new ArrayList<String>();
+ HashMap<String, ArrayList<Integer>> splits_map = this.getLocationsOfSplits();
+ readNodesFromXML();
+ int count = this.splits.size();
+
+ ArrayList<Integer> splits;
+ String node;
+ for (ArrayList<String> info : this.nodes) {
+ node = info.get(1);
+ if (splits_map.containsKey(node)) {
+ splits = splits_map.get(node);
+ for (Integer split : splits) {
+ schedule.put(split, node);
+ count--;
+ }
+ splits_map.remove(node);
+ } else {
+ empty.add(node);
+ }
+ }
+
+ //Check if every split got assigned to a node
+ if (count != 0) {
+ ArrayList<Integer> remaining = new ArrayList<Integer>();
+ // Find remaining splits
+ for (InputSplit s : this.splits) {
+ int i = 0;
+ if (!schedule.containsKey(i)) {
+ remaining.add(i);
+ }
+ }
+
+ if (empty.size() != 0) {
+ int node_number = 0;
+ for (int split : remaining) {
+ if (node_number == empty.size()) {
+ node_number = 0;
+ }
+ schedule.put(split, empty.get(node_number));
+ node_number++;
+ }
+ }
+ }
+ }
+
+ /**
+ * Read the hostname and the ip address of every node from the xml cluster configuration file.
+ * Save the information inside nodes.
+ *
+ * @throws ParserConfigurationException
+ * @throws IOException
+ * @throws SAXException
+ */
+ public void readNodesFromXML() throws ParserConfigurationException, SAXException, IOException {
+ DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder dBuilder;
+ dBuilder = dbFactory.newDocumentBuilder();
+ nodes = new ArrayList<ArrayList<String>>();
+ for (NodeControllerInfo ncInfo : nodeControllerInfos.values()) {
+ //Will this include the master node? Is that bad?
+ ArrayList<String> info = new ArrayList<String>();
+ info.add(ncInfo.getNodeId());
+ info.add(ncInfo.getNetworkAddress().getAddress());
+ nodes.add(info);
+ }
+ }
+
+ /**
+ * Writes the schedule to a temporary file, then uploads the file to the HDFS.
+ *
+ * @throws UnsupportedEncodingException
+ * @throws FileNotFoundException
+ */
+ public void addScheduleToDistributedCache() throws FileNotFoundException, UnsupportedEncodingException {
+ PrintWriter writer;
+ writer = new PrintWriter(filepath, "UTF-8");
+ for (int split : this.schedule.keySet()) {
+ writer.write(split + "," + this.schedule.get(split));
+ }
+ writer.close();
+ // Add file to HDFS
+ this.put(filepath, dfs_path);
+ }
+
+ public RecordReader getReader() {
+
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i = 0; i < splits.size(); i++) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ FileSplitsFactory splitsFactory;
+ try {
+ splitsFactory = new FileSplitsFactory(fileSplits);
+ List<FileSplit> inputSplits = splitsFactory.getSplits();
+ ContextFactory ctxFactory = new ContextFactory();
+ int size = inputSplits.size();
+ for (int i = 0; i < size; i++) {
+ /**
+ * read the split
+ */
+ TaskAttemptContext context;
+ try {
+ context = ctxFactory.createContext(job.getConfiguration(), i);
+ RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
+ reader.initialize(inputSplits.get(i), context);
+ return reader;
+ } catch (IOException | InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ }
+ } catch (HyracksDataException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return schedule.
+ */
+ public HashMap<Integer, String> getSchedule() {
+ return this.schedule;
+ }
+
+ /**
+ * Return the splits belonging to this node for the existing schedule.
+ *
+ * @param node
+ * @return
+ */
+ public ArrayList<Integer> getScheduleForNode(String node) {
+ ArrayList<Integer> node_schedule = new ArrayList<Integer>();
+ for (int split : this.schedule.keySet()) {
+ if (node.equals(this.schedule.get(split))) {
+ node_schedule.add(split);
+ }
+ }
+ return node_schedule;
+ }
+
+ public List<InputSplit> getSplits() {
+ return this.splits;
+ }
+
+ public Job getJob() {
+ return this.job;
+ }
+
+ public InputFormat getinputFormat() {
+ return this.inputFormat;
+ }
+
+ public Document convertStringToDocument(String xmlStr) {
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder;
+ try {
+ builder = factory.newDocumentBuilder();
+ Document doc = builder.parse(new InputSource(new StringReader(xmlStr)));
+ return doc;
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ return null;
+ }
+}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
new file mode 100644
index 0000000..1d053b6
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlCollectionWithTagInputFormat extends TextInputFormat {
+
+ public static String STARTING_TAG;
+ public static String ENDING_TAG;
+
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+ try {
+ STARTING_TAG = context.getConfiguration().get("start_tag");
+ ENDING_TAG = context.getConfiguration().get("end_tag");
+ return new XmlRecordReader((FileSplit) split, context.getConfiguration());
+ } catch (IOException ioe) {
+ return null;
+ }
+ }
+
+ /**
+ * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
+ * by the end tag
+ */
+ public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
+
+ private final byte[] end_tag;
+ private final byte[] start_tag;
+ private final long start;
+ private final long end;
+ private final FSDataInputStream fsin;
+ private final DataOutputBuffer buffer = new DataOutputBuffer();
+ private LongWritable currentKey;
+ private Text currentValue;
+ BlockLocation[] blocks;
+ public static byte[] nl = "\n".getBytes();
+
+ public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
+ end_tag = ENDING_TAG.getBytes(Charsets.UTF_8);
+ start_tag = STARTING_TAG.getBytes(Charsets.UTF_8);
+
+ // open the file and seek to the start of the split
+ start = split.getStart();
+ // set the end of the file
+ end = start + split.getLength();
+ Path file = split.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ FileStatus fStatus = fs.getFileStatus(file);
+ blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
+ // seek the start of file
+ fsin = fs.open(split.getPath());
+ fsin.seek(start);
+ }
+
+ /**
+ * Get next block item
+ *
+ * @param key
+ * @param value
+ * @return
+ * @throws IOException
+ */
+ private boolean next(LongWritable key, Text value) throws IOException {
+ if (fsin.getPos() < end) {
+ try {
+ if (readBlock(true)) {
+ key.set(fsin.getPos());
+ value.set(buffer.getData(), 0, buffer.getLength());
+ return true;
+ }
+ } finally {
+ buffer.reset();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.close(fsin, true);
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return (fsin.getPos() - start) / (float) (end - start);
+ }
+
+ /**
+ * Read the block from start till end and after that until you find a closing tag
+ *
+ * @param withinBlock
+ * @return
+ * @throws IOException
+ */
+ private boolean readBlock(boolean withinBlock) throws IOException {
+ boolean read = false;
+
+ while (true) {
+ if (fsin.getPos() < end) {
+ if (readUntilMatch(start_tag, false)) {
+ buffer.write(start_tag);
+ readUntilMatch(end_tag, true);
+ read = true;
+ }
+ } else {
+ return read;
+ }
+ }
+ }
+
+ /**
+ * Read from block(s) until you reach the end of file or find a matching bytes with match[]
+ *
+ * @param match
+ * @param withinBlock
+ * @return
+ * @throws IOException
+ */
+ private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
+ int i = 0;
+ while (true) {
+ int b = fsin.read();
+ // end of file:
+ if (b == -1) {
+ return false;
+ }
+ // save to buffer:
+ if (withinBlock) {
+ buffer.write(b);
+ }
+
+ // check if we're matching:
+ if (b == match[i]) {
+ i++;
+ if (i >= match.length) {
+ return true;
+ }
+ } else {
+ i = 0;
+ }
+ // see if we've passed the stop point:
+ if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+ return false;
+ }
+ }
+ }
+
+ private int nextBlock() throws IOException {
+ long pos = fsin.getPos();
+ long block_length;
+ for (int i = 0; i < blocks.length; i++) {
+ block_length = blocks[i].getOffset() + blocks[i].getLength();
+ if (pos == block_length) {
+ return i + 1;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ currentKey = new LongWritable();
+ currentValue = new Text();
+ return next(currentKey, currentValue);
+ }
+ }
+}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index 62d1ca7..3c2d6aa 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -37,6 +37,7 @@
private String[] collectionPartitions;
private final List<Integer> childSeq;
private int totalDataSources;
+ private String tag;
private final Object[] types;
@@ -60,6 +61,7 @@
}
};
this.childSeq = new ArrayList<Integer>();
+ this.tag = null;
}
public int getTotalDataSources() {
@@ -77,7 +79,7 @@
public String[] getPartitions() {
return collectionPartitions;
}
-
+
public void setPartitions(String[] collectionPartitions) {
this.collectionPartitions = collectionPartitions;
}
@@ -86,6 +88,14 @@
return collectionPartitions.length;
}
+ public String getTag() {
+ return this.tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
@Override
public String getId() {
return collectionName;
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index d5966b8..b8dca63 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -16,15 +16,37 @@
*/
package org.apache.vxquery.metadata;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.xml.parsers.ParserConfigurationException;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameFieldAppender;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -38,10 +60,14 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
import org.apache.vxquery.context.DynamicContext;
+import org.apache.vxquery.hdfs2.HDFSFunctions;
import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
import org.apache.vxquery.xmlparser.XMLParser;
+import org.xml.sax.SAXException;
public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -50,15 +76,23 @@
private String[] collectionPartitions;
private List<Integer> childSeq;
protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
+ private HDFSFunctions hdfs;
+ private String tag;
+ private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n";
+ private final String hdfsConf;
+ private final Map<String, NodeControllerInfo> nodeControllerInfos;
public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds,
- RecordDescriptor rDesc) {
+ RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
super(spec, 1, 1);
collectionPartitions = ds.getPartitions();
dataSourceId = (short) ds.getDataSourceId();
totalDataSources = (short) ds.getTotalDataSources();
childSeq = ds.getChildSeq();
recordDescriptors[0] = rDesc;
+ this.tag = ds.getTag();
+ this.hdfsConf = hdfsConf;
+ this.nodeControllerInfos = nodeControllerInfos;
}
@Override
@@ -83,31 +117,155 @@
public void open() throws HyracksDataException {
appender.reset(frame, true);
writer.open();
+ hdfs = new HDFSFunctions(nodeControllerInfos, hdfsConf);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
fta.reset(buffer);
String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
- File collectionDirectory = new File(collectionModifiedName);
-
- // Go through each tuple.
- if (collectionDirectory.isDirectory()) {
- for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
- @SuppressWarnings("unchecked")
- Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
- TrueFileFilter.INSTANCE);
- while (it.hasNext()) {
- File xmlDocument = it.next();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath());
+ if (!collectionModifiedName.contains("hdfs:/")) {
+ File collectionDirectory = new File(collectionModifiedName);
+ //check if directory is in the local file system
+ if (collectionDirectory.exists()) {
+ // Go through each tuple.
+ if (collectionDirectory.isDirectory()) {
+ for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+ Iterator<File> it = FileUtils.iterateFiles(collectionDirectory,
+ new VXQueryIOFileFilter(), TrueFileFilter.INSTANCE);
+ while (it.hasNext()) {
+ File xmlDocument = it.next();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath());
+ }
+ parser.parseElements(xmlDocument, writer, tupleIndex);
+ }
}
- parser.parseElements(xmlDocument, writer, tupleIndex);
+ } else {
+ throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":"
+ + collectionDirectory.getAbsolutePath() + ") passed to collection.");
}
}
} else {
- throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":"
- + collectionDirectory.getAbsolutePath() + ") passed to collection.");
+ // Else check in HDFS file system
+ // Get instance of the HDFS filesystem
+ FileSystem fs = hdfs.getFileSystem();
+ if (fs != null) {
+ collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", "");
+ Path directory = new Path(collectionModifiedName);
+ Path xmlDocument;
+ if (tag != null) {
+ hdfs.setJob(directory.toString(), tag);
+ tag = "<" + tag + ">";
+ Job job = hdfs.getJob();
+ InputFormat inputFormat = hdfs.getinputFormat();
+ try {
+ hdfs.scheduleSplits();
+ ArrayList<Integer> schedule = hdfs
+ .getScheduleForNode(InetAddress.getLocalHost().getHostAddress());
+ List<InputSplit> splits = hdfs.getSplits();
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i : schedule) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits);
+ List<FileSplit> inputSplits = splitsFactory.getSplits();
+ ContextFactory ctxFactory = new ContextFactory();
+ int size = inputSplits.size();
+ InputStream stream;
+ String value;
+ RecordReader reader;
+ TaskAttemptContext context;
+ for (int i = 0; i < size; i++) {
+ //read split
+ context = ctxFactory.createContext(job.getConfiguration(), i);
+ try {
+ reader = inputFormat.createRecordReader(inputSplits.get(i), context);
+ reader.initialize(inputSplits.get(i), context);
+ while (reader.nextKeyValue()) {
+ value = reader.getCurrentValue().toString();
+ //Split value if it contains more than one item with the tag
+ if (StringUtils.countMatches(value, tag) > 1) {
+ String items[] = value.split(tag);
+ for (String item : items) {
+ if (item.length() > 0) {
+ item = START_TAG + tag + item;
+ stream = new ByteArrayInputStream(
+ item.getBytes(StandardCharsets.UTF_8));
+ parser.parseHDFSElements(stream, writer, fta, i);
+ }
+ }
+ } else {
+ value = START_TAG + value;
+ //create an input stream to the file currently reading and send it to parser
+ stream = new ByteArrayInputStream(
+ value.getBytes(StandardCharsets.UTF_8));
+ parser.parseHDFSElements(stream, writer, fta, i);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ } catch (ParserConfigurationException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ } catch (SAXException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ } else {
+ try {
+ //check if the path exists and is a directory
+ if (fs.exists(directory) && fs.isDirectory(directory)) {
+ for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+ //read every file in the directory
+ RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
+ while (it.hasNext()) {
+ xmlDocument = it.next().getPath();
+ if (fs.isFile(xmlDocument)) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(
+ "Starting to read XML document: " + xmlDocument.getName());
+ }
+ //create an input stream to the file currently reading and send it to parser
+ InputStream in = fs.open(xmlDocument).getWrappedStream();
+ parser.parseHDFSElements(in, writer, fta, tupleIndex);
+ }
+ }
+ }
+ } else {
+ throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":"
+ + directory + ") passed to collection.");
+ }
+ } catch (FileNotFoundException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ }
+ try {
+ fs.close();
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(e.getMessage());
+ }
+ }
+ }
}
}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index 238f6d3..820c365 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -22,8 +22,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.vxquery.context.StaticContext;
-
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -45,6 +43,7 @@
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -52,16 +51,22 @@
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import org.apache.vxquery.context.StaticContext;
public class VXQueryMetadataProvider implements IMetadataProvider<String, String> {
private final String[] nodeList;
private final Map<String, File> sourceFileMap;
private final StaticContext staticCtx;
+ private final String hdfsConf;
+ private final Map<String, NodeControllerInfo> nodeControllerInfos;
- public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx) {
+ public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx,
+ String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
this.nodeList = nodeList;
this.sourceFileMap = sourceFileMap;
this.staticCtx = staticCtx;
+ this.hdfsConf = hdfsConf;
+ this.nodeControllerInfos = nodeControllerInfos;
}
@Override
@@ -82,7 +87,7 @@
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException {
+ throws AlgebricksException {
VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource;
if (sourceFileMap != null) {
final int len = ds.getPartitions().length;
@@ -95,7 +100,8 @@
ds.setPartitions(collectionPartitions);
}
RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
- IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc);
+ IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf,
+ this.nodeControllerInfos);
AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount());
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint);
@@ -129,7 +135,7 @@
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException {
+ throws AlgebricksException {
throw new UnsupportedOperationException();
}
@@ -155,7 +161,7 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
throw new UnsupportedOperationException();
}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
index d8c4e68..d394bbc 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
@@ -19,10 +19,21 @@
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.vxquery.context.DynamicContext;
import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
import org.apache.vxquery.datamodel.accessors.TypedPointables;
@@ -33,6 +44,7 @@
import org.apache.vxquery.datamodel.values.ValueTag;
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.hdfs2.HDFSFunctions;
import org.apache.vxquery.runtime.functions.arithmetic.AbstractArithmeticOperation;
import org.apache.vxquery.runtime.functions.cast.CastToDoubleOperation;
import org.apache.vxquery.runtime.functions.comparison.AbstractValueComparisonOperation;
@@ -42,14 +54,6 @@
import org.apache.vxquery.types.BuiltinTypeRegistry;
import org.apache.vxquery.xmlparser.XMLParser;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
public class FunctionHelper {
public static void arithmeticOperation(AbstractArithmeticOperation aOp, DynamicContext dCtx,
@@ -1213,8 +1217,33 @@
} catch (SystemException e) {
throw new HyracksDataException(e);
}
- File file = new File(fName);
- parser.parseDocument(file, abvs);
+ if (!fName.contains("hdfs:/")) {
+ File file = new File(fName);
+ if (file.exists()) {
+ parser.parseDocument(file, abvs);
+ }
+ }
+ //else check in HDFS file system
+ else {
+ fName = fName.replaceAll("hdfs:/", "");
+ HDFSFunctions hdfs = new HDFSFunctions(null, null);
+ FileSystem fs = hdfs.getFileSystem();
+ if (fs != null) {
+ Path xmlDocument = new Path(fName);
+ try {
+ if (fs.exists(xmlDocument)) {
+ InputStream in = fs.open(xmlDocument).getWrappedStream();
+ parser.parseHDFSDocument(in, abvs);
+ }
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ System.err.println(e);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ System.err.println(e);
+ }
+ }
+ }
}
public static boolean transformThenCompareMinMaxTaggedValues(AbstractValueComparisonOperation aOp,
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
index 77d500c..a62a26c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
@@ -19,6 +19,7 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
@@ -28,6 +29,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.vxquery.context.StaticContext;
import org.apache.vxquery.exceptions.VXQueryFileNotFoundException;
import org.apache.vxquery.exceptions.VXQueryParseException;
@@ -98,8 +100,7 @@
}
}
- public void parseElements(File file, IFrameWriter writer, int tupleIndex)
- throws HyracksDataException {
+ public void parseElements(File file, IFrameWriter writer, int tupleIndex) throws HyracksDataException {
try {
Reader input;
if (bufferSize > 0) {
@@ -126,4 +127,48 @@
}
}
+ public void parseHDFSElements(InputStream inputStream, IFrameWriter writer, FrameTupleAccessor fta, int tupleIndex)
+ throws IOException {
+ try {
+ Reader input;
+ if (bufferSize > 0) {
+ input = new BufferedReader(new InputStreamReader(inputStream), bufferSize);
+ } else {
+ input = new InputStreamReader(inputStream);
+ }
+ in.setCharacterStream(input);
+ handler.setupElementWriter(writer, tupleIndex);
+ parser.parse(in);
+ input.close();
+ } catch (IOException e) {
+ HyracksDataException hde = new HyracksDataException(e);
+ hde.setNodeId(nodeId);
+ throw hde;
+ } catch (SAXException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void parseHDFSDocument(InputStream inputStream, ArrayBackedValueStorage abvs) throws HyracksDataException {
+ try {
+ Reader input;
+ if (bufferSize > 0) {
+ input = new BufferedReader(new InputStreamReader(inputStream), bufferSize);
+ } else {
+ input = new InputStreamReader(inputStream);
+ }
+ in.setCharacterStream(input);
+ parser.parse(in);
+ handler.writeDocument(abvs);
+ input.close();
+ } catch (IOException e) {
+ HyracksDataException hde = new HyracksDataException(e);
+ hde.setNodeId(nodeId);
+ throw hde;
+ } catch (SAXException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
index e42caba..8a044ea 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
@@ -17,28 +17,7 @@
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
-
-import org.apache.vxquery.compiler.CompilerControlBlock;
-import org.apache.vxquery.compiler.algebricks.VXQueryBinaryBooleanInspectorFactory;
-import org.apache.vxquery.compiler.algebricks.VXQueryBinaryIntegerInspectorFactory;
-import org.apache.vxquery.compiler.algebricks.VXQueryComparatorFactoryProvider;
-import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
-import org.apache.vxquery.compiler.algebricks.VXQueryExpressionRuntimeProvider;
-import org.apache.vxquery.compiler.algebricks.VXQueryNullWriterFactory;
-import org.apache.vxquery.compiler.algebricks.VXQueryPrinterFactoryProvider;
-import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor;
-import org.apache.vxquery.compiler.rewriter.RewriteRuleset;
-import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
-import org.apache.vxquery.exceptions.ErrorCode;
-import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.metadata.VXQueryMetadataProvider;
-import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFactoryProvider;
-import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFamilyProvider;
-import org.apache.vxquery.types.BuiltinTypeRegistry;
-import org.apache.vxquery.types.Quantifier;
-import org.apache.vxquery.types.SequenceType;
-import org.apache.vxquery.xmlquery.ast.ModuleNode;
-import org.apache.vxquery.xmlquery.translator.XMLQueryTranslator;
+import java.util.Map;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -67,16 +46,40 @@
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.vxquery.compiler.CompilerControlBlock;
+import org.apache.vxquery.compiler.algebricks.VXQueryBinaryBooleanInspectorFactory;
+import org.apache.vxquery.compiler.algebricks.VXQueryBinaryIntegerInspectorFactory;
+import org.apache.vxquery.compiler.algebricks.VXQueryComparatorFactoryProvider;
+import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
+import org.apache.vxquery.compiler.algebricks.VXQueryExpressionRuntimeProvider;
+import org.apache.vxquery.compiler.algebricks.VXQueryNullWriterFactory;
+import org.apache.vxquery.compiler.algebricks.VXQueryPrinterFactoryProvider;
+import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor;
+import org.apache.vxquery.compiler.rewriter.RewriteRuleset;
+import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.metadata.VXQueryMetadataProvider;
+import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFactoryProvider;
+import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFamilyProvider;
+import org.apache.vxquery.types.BuiltinTypeRegistry;
+import org.apache.vxquery.types.Quantifier;
+import org.apache.vxquery.types.SequenceType;
+import org.apache.vxquery.xmlquery.ast.ModuleNode;
+import org.apache.vxquery.xmlquery.translator.XMLQueryTranslator;
public class XMLQueryCompiler {
private final XQueryCompilationListener listener;
private final ICompilerFactory cFactory;
+ private final String hdfsConf;
+
private LogicalOperatorPrettyPrintVisitor pprinter;
private ModuleNode moduleNode;
@@ -87,17 +90,27 @@
private int frameSize;
+ private Map<String, NodeControllerInfo> nodeControllerInfos;
+
private String[] nodeList;
- public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize) {
- this(listener, nodeList, frameSize, -1, -1, -1);
+ public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos,
+ int frameSize, String hdfsConf) {
+ this(listener, nodeControllerInfos, frameSize, -1, -1, -1, hdfsConf);
}
- public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize,
- int availableProcessors, long joinHashSize, long maximumDataSize) {
+ public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos,
+ int frameSize) {
+ this(listener, nodeControllerInfos, frameSize, -1, -1, -1, "");
+ }
+
+ public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos,
+ int frameSize, int availableProcessors, long joinHashSize, long maximumDataSize, String hdfsConf) {
this.listener = listener == null ? NoopXQueryCompilationListener.INSTANCE : listener;
this.frameSize = frameSize;
- this.nodeList = nodeList;
+ this.nodeControllerInfos = nodeControllerInfos;
+ setNodeList();
+ this.hdfsConf = hdfsConf;
HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
new IOptimizationContextFactory() {
@Override
@@ -116,12 +129,12 @@
builder.getPhysicalOptimizationConfig().setMaxFramesHybridHash((int) (joinHashSize / this.frameSize));
}
if (maximumDataSize > 0) {
- builder.getPhysicalOptimizationConfig().setMaxFramesLeftInputHybridHash(
- (int) (maximumDataSize / this.frameSize));
+ builder.getPhysicalOptimizationConfig()
+ .setMaxFramesLeftInputHybridHash((int) (maximumDataSize / this.frameSize));
}
- builder.getPhysicalOptimizationConfig().setMaxFramesLeftInputHybridHash(
- (int) (60L * 1024 * 1048576 / this.frameSize));
+ builder.getPhysicalOptimizationConfig()
+ .setMaxFramesLeftInputHybridHash((int) (60L * 1024 * 1048576 / this.frameSize));
builder.setLogicalRewrites(buildDefaultLogicalRewrites());
builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
@@ -192,15 +205,26 @@
cFactory = builder.create();
}
+ /**
+ * Set Configuration of node controllers as array of Strings.
+ */
+ private void setNodeList() {
+ nodeList = new String[nodeControllerInfos.size()];
+ int index = 0;
+ for (String node : nodeControllerInfos.keySet()) {
+ nodeList[index++] = node;
+ }
+ }
+
public void compile(String name, Reader query, CompilerControlBlock ccb, int optimizationLevel)
throws SystemException {
moduleNode = XMLQueryParser.parse(name, query);
listener.notifyParseResult(moduleNode);
module = new XMLQueryTranslator(ccb).translateModule(moduleNode);
- pprinter = new LogicalOperatorPrettyPrintVisitor(new VXQueryLogicalExpressionPrettyPrintVisitor(
- module.getModuleContext()));
+ pprinter = new LogicalOperatorPrettyPrintVisitor(
+ new VXQueryLogicalExpressionPrettyPrintVisitor(module.getModuleContext()));
VXQueryMetadataProvider mdProvider = new VXQueryMetadataProvider(nodeList, ccb.getSourceFileMap(),
- module.getModuleContext());
+ module.getModuleContext(), this.hdfsConf, nodeControllerInfos);
compiler = cFactory.createCompiler(module.getBody(), mdProvider, 0);
listener.notifyTranslationResult(module);
XMLQueryTypeChecker.typeCheckModule(module);
diff --git a/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java b/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java
index 165b330..13ca921 100644
--- a/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java
+++ b/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java
@@ -20,16 +20,19 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
import java.util.zip.GZIPInputStream;
-import junit.framework.Assert;
-
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.vxquery.compiler.CompilerControlBlock;
import org.apache.vxquery.context.RootStaticContextImpl;
import org.apache.vxquery.context.StaticContextImpl;
import org.junit.Test;
-import org.apache.hyracks.api.dataset.ResultSetId;
+import junit.framework.Assert;
public class SimpleXQueryTest {
@Test
@@ -96,8 +99,8 @@
private static String gunzip(String dir, String filename) {
try {
- GZIPInputStream in = new GZIPInputStream(new BufferedInputStream(new FileInputStream(new File(dir
- + filename + ".gz"))));
+ GZIPInputStream in = new GZIPInputStream(
+ new BufferedInputStream(new FileInputStream(new File(dir + filename + ".gz"))));
File temp = File.createTempFile("vxquery", filename);
temp.deleteOnExit();
FileOutputStream out = new FileOutputStream(temp);
@@ -133,7 +136,11 @@
}
private static void runTestInternal(String testName, String query) throws Exception {
- XMLQueryCompiler compiler = new XMLQueryCompiler(null, new String[] { "nc1" }, 65536);
+
+ Map<String, NodeControllerInfo> nodeControllerInfos = new HashMap<String, NodeControllerInfo>();
+ nodeControllerInfos.put("nc1", new NodeControllerInfo("nc1", null, new NetworkAddress("127.0.0.1", 0), null));
+
+ XMLQueryCompiler compiler = new XMLQueryCompiler(null, nodeControllerInfos, 65536);
CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
new ResultSetId(System.nanoTime()), null);
compiler.compile(testName, new StringReader(query), ccb, Integer.MAX_VALUE);
diff --git a/vxquery-server/src/main/resources/conf/cluster.properties b/vxquery-server/src/main/resources/conf/cluster.properties
index fd015d4..6339fd9 100644
--- a/vxquery-server/src/main/resources/conf/cluster.properties
+++ b/vxquery-server/src/main/resources/conf/cluster.properties
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,4 +51,4 @@
NCJAVA_OPTS="-server -Xmx7G -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/testing_logging.properties"
# debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
# Yourkit option: -agentpath:/tools/yjp-2014-build-14114/bin/linux-x86-64/libyjpagent.so=port=20001"
-# Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling
+# Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling
\ No newline at end of file
diff --git a/vxquery-xtest/pom.xml b/vxquery-xtest/pom.xml
index 0c563e7..4cb7c60 100644
--- a/vxquery-xtest/pom.xml
+++ b/vxquery-xtest/pom.xml
@@ -161,6 +161,21 @@
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-2.x</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<scope>compile</scope>
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java
new file mode 100644
index 0000000..0720baa
--- /dev/null
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.xtest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.mapred.JobConf;
+
+public class MiniDFS {
+
+ private MiniDFSCluster dfsCluster;
+
+ public void miniDFS() {
+
+ }
+
+ public void startHDFS() throws IOException {
+
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ JobConf conf = new JobConf();
+ String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ int numDataNodes = 1;
+ int nameNodePort = 40000;
+
+ // cleanup artifacts created on the local file system
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ MiniDFSCluster.Builder build = new MiniDFSCluster.Builder(conf);
+ build.nameNodePort(nameNodePort);
+ build.nameNodeHttpPort(nameNodePort + 34);
+ build.numDataNodes(numDataNodes);
+ build.checkExitOnShutdown(true);
+ build.startupOption(StartupOption.REGULAR);
+ build.format(true);
+ build.waitSafeMode(true);
+ dfsCluster = build.build();
+
+ FileSystem dfs = FileSystem.get(conf);
+ String DATA_PATH = "src/test/resources/TestSources/ghcnd";
+ Path src = new Path(DATA_PATH);
+ dfs.mkdirs(new Path("/tmp"));
+ Path dest = new Path("/tmp/vxquery-hdfs-test");
+ dfs.copyFromLocalFile(src, dest);
+ if (dfs.exists(dest)) {
+ System.err.println("Test files copied to HDFS successfully");
+ }
+ }
+
+ public void shutdownHDFS() {
+ System.err.println("Tests completed.Shutting down HDFS");
+ dfsCluster.shutdown();
+ }
+}
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java
index 18db5c1..5b6ddff 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java
@@ -18,13 +18,16 @@
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.net.InetAddress;
import java.util.EnumSet;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -63,6 +66,7 @@
private NodeControllerService nc1;
private IHyracksClientConnection hcc;
private IHyracksDataset hds;
+ private final String publicAddress = InetAddress.getLocalHost().getHostAddress();
public TestRunner(XTestOptions opts) throws Exception {
this.opts = opts;
@@ -70,9 +74,9 @@
public void open() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetIpAddress = publicAddress;
ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetIpAddress = publicAddress;
ccConfig.clusterNetPort = 39001;
ccConfig.profileDumpPeriod = 10000;
File outDir = new File("target/ClusterController");
@@ -87,9 +91,9 @@
NCConfig ncConfig1 = new NCConfig();
ncConfig1.ccHost = "localhost";
ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.resultIPAddress = "127.0.0.1";
+ ncConfig1.clusterNetIPAddress = publicAddress;
+ ncConfig1.dataIPAddress = publicAddress;
+ ncConfig1.resultIPAddress = publicAddress;
ncConfig1.nodeId = "nc1";
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -115,7 +119,14 @@
VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET,
opts.showOET, opts.showRP);
- XMLQueryCompiler compiler = new XMLQueryCompiler(listener, new String[] { "nc1" }, opts.frameSize);
+
+ Map<String, NodeControllerInfo> nodeControllerInfos = null;
+ if (hcc != null) {
+ nodeControllerInfos = hcc.getNodeControllerInfos();
+ }
+
+ XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize,
+ opts.hdfsConf);
Reader in = new InputStreamReader(new FileInputStream(testCase.getXQueryFile()), "UTF-8");
CompilerControlBlock ccb = new CompilerControlBlock(
new StaticContextImpl(RootStaticContextImpl.INSTANCE),
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
index 854cbf8..496b74a 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
@@ -76,4 +76,7 @@
@Option(name = "-showresult", usage = "Show query result.")
boolean showResult;
+
+ @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files")
+ String hdfsConf;
}
diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
index 12a91a9..5411215 100644
--- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
+++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
@@ -37,6 +37,7 @@
opts.threads = 1;
opts.showQuery = true;
opts.showResult = true;
+ opts.hdfsConf = "src/test/resources/hadoop/conf";
return opts;
}
diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java
index 3704d07..4d0ddc0 100644
--- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java
+++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java
@@ -17,15 +17,19 @@
package org.apache.vxquery.xtest;
import java.io.File;
+import java.io.IOException;
import java.util.Collection;
import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class VXQueryTest extends AbstractXQueryTest {
+ private static MiniDFS dfs;
private static String VXQUERY_CATALOG = StringUtils.join(new String[] { "src", "test", "resources",
"VXQueryCatalog.xml" }, File.separator);
@@ -52,4 +56,19 @@
return getOptions();
}
+ @BeforeClass
+ public static void setupHDFS() {
+ dfs = new MiniDFS();
+ try {
+ dfs.startHDFS();
+ } catch (IOException e) {
+ System.err.println(e);
+ }
+ }
+
+ @AfterClass
+ public static void shutdownHDFS() {
+ dfs.shutdownHDFS();
+ }
+
}
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/avgHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/avgHDFS.txt
new file mode 100644
index 0000000..7ef6ffe
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/avgHDFS.txt
@@ -0,0 +1 @@
+12.5
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/countHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/countHDFS.txt
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/countHDFS.txt
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxHDFS.txt
new file mode 100644
index 0000000..dc7b54a
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxHDFS.txt
@@ -0,0 +1 @@
+33
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxvalueHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxvalueHDFS.txt
new file mode 100644
index 0000000..e37d32a
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxvalueHDFS.txt
@@ -0,0 +1 @@
+1000
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/minHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/minHDFS.txt
new file mode 100644
index 0000000..ea1acb6
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/minHDFS.txt
@@ -0,0 +1 @@
+11.25
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/sumHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/sumHDFS.txt
new file mode 100644
index 0000000..2b82dfe
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/sumHDFS.txt
@@ -0,0 +1 @@
+60
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/avgHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/avgHDFS.xq
new file mode 100644
index 0000000..3214b97
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/avgHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the average minimum temperature. :)
+fn:avg(
+ let $collection := "hdfs://tmp/vxquery-hdfs-test"
+ for $r in collection($collection)/dataCollection/data
+ where $r/dataType eq "TMIN"
+ return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/countHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/countHDFS.xq
new file mode 100644
index 0000000..7940b03
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/countHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the number of wind sensor readings. :)
+fn:count(
+ let $collection := "hdfs://tmp/vxquery-hdfs-test"
+ for $r in collection($collection)/dataCollection/data
+ where $r/dataType eq "AWND"
+ return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxHDFS.xq
new file mode 100644
index 0000000..0db1980
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the highest max temperature. :)
+fn:max(
+ let $collection := "hdfs://tmp/vxquery-hdfs-test"
+ for $r in collection($collection)/dataCollection/data
+ where $r/dataType eq "TMAX"
+ return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
new file mode 100644
index 0000000..9d04916
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
@@ -0,0 +1,23 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the max value. :)
+fn:max(
+ for $r in collection-with-tag("hdfs://tmp/vxquery-hdfs-test/half_1/quarter_1/sensors", "data")/data
+ return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/minHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/minHDFS.xq
new file mode 100644
index 0000000..869e222
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/minHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the lowest min temperature. :)
+fn:min(
+ let $collection := "hdfs://tmp/vxquery-hdfs-test"
+ for $r in collection($collection)/dataCollection/data
+ where $r/dataType eq "TMIN"
+ return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/sumHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/sumHDFS.xq
new file mode 100644
index 0000000..1fdf743
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/sumHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the total precipitation. :)
+fn:sum(
+ let $collection := "hdfs://tmp/vxquery-hdfs-test"
+ for $r in collection($collection)/dataCollection/data
+ where $r/dataType eq "PRCP"
+ return $r/value
+)
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/VXQueryCatalog.xml b/vxquery-xtest/src/test/resources/VXQueryCatalog.xml
index 6f7acee..f75ce49 100644
--- a/vxquery-xtest/src/test/resources/VXQueryCatalog.xml
+++ b/vxquery-xtest/src/test/resources/VXQueryCatalog.xml
@@ -40,6 +40,8 @@
<!ENTITY SingleQuery SYSTEM "cat/SingleQuery.xml">
<!ENTITY SingleAlternateQuery SYSTEM "cat/SingleAlternateQuery.xml">
+<!ENTITY HDFSAggregateQueries SYSTEM "cat/HDFSAggregateQueries.xml">
+
]>
<test-suite xmlns="http://www.w3.org/2005/02/query-test-XQTSCatalog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -193,4 +195,17 @@
&GhcndRecordsPartition4Queries;
</test-group>
</test-group>
+ <test-group name="HDFSAggregateQueries" featureOwner="Efi Kaltirimidou">
+ <GroupInfo>
+ <title>Aggregate Partition Queries in HDFS</title>
+ <description/>
+ </GroupInfo>
+ <test-group name="CollectionReadFromHDFSAggregateTests" featureOwner="Efi Kaltirimidou">
+ <GroupInfo>
+ <title>Aggregate HDFS Execution Tests</title>
+ <description/>
+ </GroupInfo>
+ &HDFSAggregateQueries;
+ </test-group>
+ </test-group>
</test-suite>
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/cat/HDFSAggregateQueries.xml b/vxquery-xtest/src/test/resources/cat/HDFSAggregateQueries.xml
new file mode 100644
index 0000000..4f925a0
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/cat/HDFSAggregateQueries.xml
@@ -0,0 +1,53 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<test-group xmlns="http://www.w3.org/2005/02/query-test-XQTSCatalog" name="AggregateQueries" featureOwner="VXQuery">
+ <GroupInfo>
+ <title>HDFS Aggregate</title>
+ <description/>
+ </GroupInfo>
+ <test-case name="hdfs-aggregate-avg" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+ <description>Count records in HDFS returned for q02 from the weather benchmark.</description>
+ <query name="avgHDFS" date="2015-06-11"/>
+ <output-file compare="Text">avgHDFS.txt</output-file>
+ </test-case>
+ <test-case name="hdfs-aggregate-count" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+ <description>Count records in HDFS returned for q02 from the weather benchmark.</description>
+ <query name="countHDFS" date="2015-06-11"/>
+ <output-file compare="Text">countHDFS.txt</output-file>
+ </test-case>
+ <test-case name="hdfs-aggregate-min" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+ <description>Count records in HDFS returned for q05 from the weather benchmark.</description>
+ <query name="minHDFS" date="2015-06-11"/>
+ <output-file compare="Text">minHDFS.txt</output-file>
+ </test-case>
+ <test-case name="hdfs-aggregate-max" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+ <description>Count records in HDFS returned for q07 from the weather benchmark.</description>
+ <query name="maxHDFS" date="2015-06-11"/>
+ <output-file compare="Text">maxHDFS.txt</output-file>
+ </test-case>
+ <test-case name="hdfs-aggregate-sum" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+ <description>Count records in HDFS returned for q03 from the weather benchmark.</description>
+ <query name="sumHDFS" date="2015-06-11"/>
+ <output-file compare="Text">sumHDFS.txt</output-file>
+ </test-case>
+ <test-case name="hdfs-aggregate-max-value" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+ <description>Count records in HDFS returned for q03 from the weather benchmark.</description>
+ <query name="maxvalueHDFS" date="2015-10-17"/>
+ <output-file compare="Text">maxvalueHDFS.txt</output-file>
+ </test-case>
+</test-group>
diff --git a/vxquery-xtest/src/test/resources/hadoop/conf/core-site.xml b/vxquery-xtest/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..b1f576c
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>fs.default.name</name>
+ <value>hdfs://localhost:40000</value>
+</property>
+<property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/vxquery-xtest/src/test/resources/hadoop/conf/hdfs-site.xml b/vxquery-xtest/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..6b6604d
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>dfs.replication</name>
+ <value>1</value>
+</property>
+
+<property>
+ <name>dfs.block.size</name>
+ <value>1048576</value>
+</property>
+
+</configuration>
diff --git a/vxquery-xtest/src/test/resources/hadoop/conf/mapred-site.xml b/vxquery-xtest/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..e15ec74
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
+ </property>
+
+</configuration>