Merge branch 'master' into steven/hdfs
diff --git a/pom.xml b/pom.xml
index 8a97a92..4cabe8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -295,6 +296,25 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-2.x</artifactId>
+                <version>${hyracks.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-core</artifactId>
+                <version>${hyracks.version}</version>
+                <type>jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-hdfs</artifactId>
+                <version>2.7.0</version>
+            </dependency>
+
+            <dependency>
                 <groupId>ant</groupId>
                 <artifactId>ant-trax</artifactId>
                 <version>1.6.5</version>
@@ -340,7 +360,7 @@
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
-                <version>1.3.2</version>
+                <version>2.4</version>
             </dependency>
 
             <dependency>
@@ -359,9 +379,10 @@
             <dependency>
                 <groupId>org.mortbay.jetty</groupId>
                 <artifactId>jetty</artifactId>
-                <version>6.1.4</version>
+                <version>6.1.22</version>
                 <scope>compile</scope>
             </dependency>
+
         </dependencies>
     </dependencyManagement>
 
@@ -437,19 +458,21 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
-        <!-- We override the configuration plugin to override the descriptor to use for building
-          the source release zip. Specifically, we would like to control the inclusions/exclusions.
-          For example, we exclude the KEYS file from the zip -->
+                <!-- We override the configuration plugin to override the descriptor 
+                    to use for building the source release zip. Specifically, we would like to 
+                    control the inclusions/exclusions. For example, we exclude the KEYS file 
+                    from the zip -->
                 <executions>
                     <execution>
-            <!-- Use this id to match the id mentioned in the assembly plugin configuration in
-              the apache parent POM under the apache-release profile -->
+                        <!-- Use this id to match the id mentioned in the assembly plugin configuration 
+                            in the apache parent POM under the apache-release profile -->
                         <id>source-release-assembly</id>
                         <phase>package</phase>
                         <goals>
                             <goal>single</goal>
                         </goals>
-            <!-- combine.self should be override to replace the configuration in the parent POM -->
+                        <!-- combine.self should be override to replace the configuration in 
+                            the parent POM -->
                         <configuration combine.self="override">
                             <runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
                             <descriptors>
@@ -491,6 +514,29 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-site-plugin</artifactId>
             </plugin>
+            <plugin>
+            <groupId>com.google.code.maven-replacer-plugin</groupId>
+            <artifactId>replacer</artifactId>
+            <version>1.5.3</version>
+            <executions>
+                <execution>
+                    <phase>prepare-package</phase>
+                    <goals>
+                        <goal>replace</goal>
+                    </goals>                    
+                </execution>
+            </executions>
+            <configuration>
+                <ignoreMissingFile>true</ignoreMissingFile>
+                <file>vxquery-server/src/main/resources/conf/cluster.properties</file>
+                <outputFile>
+                    vxquery-server/src/main/resources/conf/cluster.properties
+                </outputFile>
+                <regex>false</regex>
+                <token>$CONF_PATH$</token>
+                <value>${basedir}/vxquery-xtest/src/test/resources/hadoop/conf</value>
+            </configuration>
+           </plugin>
         </plugins>
     </build>
 
@@ -541,6 +587,7 @@
                     <maxmemory>2g</maxmemory>
                 </configuration>
             </plugin>
+            
         </plugins>
     </reporting>
 
diff --git a/src/site/apt/user_query.apt b/src/site/apt/user_query.apt
index 8ea6429..c5132c3 100644
--- a/src/site/apt/user_query.apt
+++ b/src/site/apt/user_query.apt
@@ -49,6 +49,7 @@
 -showrp                    : Show Runtime plan
 -showtet                   : Show translated expression tree
 -timing                    : Produce timing information
+-hdfs-conf VAL             : The folder containing the HDFS configuration files
 ----------------------------------------
 
 * Java Options
diff --git a/src/site/apt/user_query_hdfs.apt b/src/site/apt/user_query_hdfs.apt
new file mode 100644
index 0000000..fa736b8
--- /dev/null
+++ b/src/site/apt/user_query_hdfs.apt
@@ -0,0 +1,180 @@
+~~ Licensed to the Apache Software Foundation (ASF) under one or more
+~~ contributor license agreements.  See the NOTICE file distributed with
+~~ this work for additional information regarding copyright ownership.
+~~ The ASF licenses this file to You under the Apache License, Version 2.0
+~~ (the "License"); you may not use this file except in compliance with
+~~ the License.  You may obtain a copy of the License at
+~~
+~~     http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License.
+
+Executing a Query in HDFS
+
+
+* 1. Connecting VXQuery with HDFS
+
+  In order to read HDFS data, VXQuery needs access to the HDFS configuration
+  directory, which contains:
+  
+    core-site.xml
+    hdfs-site.xml
+    mapred-site.xml
+    
+  Some systems may automatically set this directory as a system environment
+  variable ("HADOOP_CONF_DIR"). If this is the case, VXQuery will retrieve
+  this automatically when attempting to perform HDFS queries.
+  
+  When this variable is not set, users will need to provide this directory as
+  a Command Line Option when executing VXQuery:
+    -hdfs-conf /path/to/hdfs/conf_folder
+
+
+* 2. Running the Query
+
+
+  For files stored in HDFS there are 2 ways to access them from VXQuery.
+
+
+  [[a]] Reading them as whole files.
+
+
+  [[b]] Reading them block by block.
+
+
+** a. Reading them as whole files.
+
+  For this option you only need to change the path to files. To define that your 
+  file(s) exist and should be read from HDFS you must add <"hdfs:/"> in front 
+  of the path. VXQuery will read the path of the files you request in your query 
+  and try to locate them.
+
+
+  So in order to run a query that will read the input files from HDFS you need 
+  to make sure that
+
+
+  a) The environmental variable is set for "HADOOP_CONF_DIR" or you pass the 
+  directory location using -hdfs-conf
+
+
+  b) The path defined in your query begins with <hdfs://> and the full path to 
+  the file(s).
+
+
+  c) The path exists on HDFS and the user that runs the query has read permission 
+  to these files.
+
+
+*** Example
+
+  I want to find all the <books> that are published after 2004.
+
+
+  The file is located in HDFS in this path </user/hduser/store/books.xml>
+
+
+  My query will look like this:
+
+
+----------
+for $x in collection("hdfs://user/hduser/store")
+where $x/year>2004
+return $x/title
+----------
+
+
+  If I want only one file, the <<books.xml>> to be parsed from HDFS, my query will 
+  look like this:
+
+
+----------
+for $x in doc("hdfs://user/hduser/store/books.xml")
+where $x/year>2004
+return $x/title
+----------
+
+
+** b. Reading them block by block
+
+
+  In order to use that option you need to modify your query. Instead of using the 
+  <collection> or <doc> function to define your input file(s) you need to use 
+  <collection-with-tag>.
+
+
+  <collection-with-tag> accepts two arguments, one is the path to the HDFS directory 
+  you have stored your input files, and the second is a specific <<tag>> that exists 
+  in the input file(s). This is the tag of the element that contains the fields that 
+  your query is looking for.
+
+  Other than these arguments, you do not need to change anything else in the query.
+  
+  Note: since this strategy is optimized to read block by block, the result will 
+  include all elements with the given tag, regardless of depth within the xml tree.
+
+
+*** Example
+  
+  The same example, using <<collection-with-tag>>.
+
+  My input file <books.xml>:
+
+-----------------------------
+<?xml version="1.0" encoding="UTF-8"?>
+<bookstore>
+
+<book>
+  <title lang="en">Everyday Italian</title>
+  <author>Giada De Laurentiis</author>
+  <year>2005</year>
+  <price>30.00</price>
+</book>
+
+<book>
+  <title lang="en">Harry Potter</title>
+  <author>J K. Rowling</author>
+  <year>2005</year>
+  <price>29.99</price>
+</book>
+
+<book>
+  <title lang="en">XQuery Kick Start</title>
+  <author>James McGovern</author>
+  <author>Per Bothner</author>
+  <author>Kurt Cagle</author>
+  <author>James Linn</author>
+  <author>Vaidyanathan Nagarajan</author>
+  <year>2003</year>
+  <price>49.99</price>
+</book>
+
+<book>
+  <title lang="en">Learning XML</title>
+  <author>Erik T. Ray</author>
+  <year>2003</year>
+  <price>39.95</price>
+</book>
+
+</bookstore>
+----------------------------
+
+
+  My query will look like this:
+
+
+----------------------------
+for $x in collection-with-tag("hdfs://user/hduser/store","book")/book
+where $x/year>2004
+return $x/title
+----------------------------
+
+
+  Take notice that I defined the path to the directory containing the file(s) 
+  and not the file, <collection-with-tag> expects the path to the directory. I also
+  added the </book> after the function. This is also needed, like <collection> and
+  <doc> functions, for the query to be parsed correctly.
diff --git a/src/site/site.xml b/src/site/site.xml
index 5640976..d64fe0d 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -64,6 +64,9 @@
                 name="Executing a Query"
                 href="user_query.html" />
             <item
+                name="Using HDFS with VXQuery"
+                href="user_query_hdfs.html" />
+            <item
                 name="Running the Test Suite"
                 href="user_running_tests.html" />
         </menu>
diff --git a/vxquery-cli/pom.xml b/vxquery-cli/pom.xml
index 7bf1ff9..c4e11cb 100644
--- a/vxquery-cli/pom.xml
+++ b/vxquery-cli/pom.xml
@@ -126,7 +126,19 @@
             <groupId>org.apache.hyracks</groupId>
             <artifactId>hyracks-dataflow-std</artifactId>
         </dependency>
+        
+        <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-core</artifactId>
+        </dependency>
+        
+        <dependency>
+                <groupId>org.apache.hyracks</groupId>
+                <artifactId>hyracks-hdfs-2.x</artifactId>
+        </dependency>
     </dependencies>
+    
+    
 
     <reporting>
         <plugins>
diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
index a02c65d..17287c6 100644
--- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
+++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
@@ -81,7 +81,8 @@
     /**
      * Constructor to use command line options passed.
      *
-     * @param opts Command line options object
+     * @param opts
+     *            Command line options object
      */
     public VXQuery(CmdLineOptions opts) {
         this.opts = opts;
@@ -181,8 +182,13 @@
                     opts.showOET, opts.showRP);
 
             start = opts.timing ? new Date() : null;
-            XMLQueryCompiler compiler = new XMLQueryCompiler(listener, getNodeList(), opts.frameSize,
-                    opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize);
+
+            Map<String, NodeControllerInfo> nodeControllerInfos = null;
+            if (hcc != null) {
+                nodeControllerInfos = hcc.getNodeControllerInfos();
+            }
+            XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize,
+                    opts.availableProcessors, opts.joinHashSize, opts.maximumDataSize, opts.hdfsConf);
             resultSetId = createResultSetId();
             CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
                     resultSetId, null);
@@ -233,30 +239,13 @@
     }
 
     /**
-     * Get cluster node configuration.
-     *
-     * @return Configuration of node controllers as array of Strings.
-     * @throws Exception
-     */
-    private String[] getNodeList() throws Exception {
-        if (hcc != null) {
-            Map<String, NodeControllerInfo> nodeControllerInfos = hcc.getNodeControllerInfos();
-            String[] nodeList = new String[nodeControllerInfos.size()];
-            int index = 0;
-            for (String node : nodeControllerInfos.keySet()) {
-                nodeList[index++] = node;
-            }
-            return nodeList;
-        }
-        return new String[0];
-    }
-
-    /**
      * Creates a Hyracks dataset, if not already existing with the job frame size, and 1 reader. Allocates a new buffer of size specified in the frame of Hyracks
      * node. Creates new dataset reader with the current job ID and result set ID. Outputs the string in buffer for each frame.
      *
-     * @param spec   JobSpecification object, containing frame size. Current specified job.
-     * @param writer Writer for output of job.
+     * @param spec
+     *            JobSpecification object, containing frame size. Current specified job.
+     * @param writer
+     *            Writer for output of job.
      * @throws Exception
      */
     private void runJob(JobSpecification spec, PrintWriter writer) throws Exception {
@@ -339,7 +328,8 @@
     /**
      * Reads the contents of file given in query into a String. The file is always closed. For XML files UTF-8 encoding is used.
      *
-     * @param query The query with filename to be processed
+     * @param query
+     *            The query with filename to be processed
      * @return UTF-8 formatted query string
      * @throws IOException
      */
@@ -361,8 +351,7 @@
      * Helper class with fields and methods to handle all command line options
      */
     private static class CmdLineOptions {
-        @Option(name = "-available-processors",
-                usage = "Number of available processors. (default: java's available processors)")
+        @Option(name = "-available-processors", usage = "Number of available processors. (default: java's available processors)")
         private int availableProcessors = -1;
 
         @Option(name = "-client-net-ip-address", usage = "IP Address of the ClusterController.")
@@ -422,6 +411,9 @@
         @Option(name = "-x", usage = "Bind an external variable")
         private Map<String, String> bindings = new HashMap<String, String>();
 
+        @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files")
+        private String hdfsConf = null;
+
         @Argument
         private List<String> arguments = new ArrayList<String>();
     }
diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml
index 59cc987..d244818 100644
--- a/vxquery-core/pom.xml
+++ b/vxquery-core/pom.xml
@@ -14,7 +14,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -63,7 +64,8 @@
                         <configuration>
                             <target>
                                 <ant antfile="build-xslt.xml" target="build">
-                                    <property name="build.gen-src.dir" value="${project.build.directory}/generated-sources/main/java" />
+                                    <property name="build.gen-src.dir"
+                                        value="${project.build.directory}/generated-sources/main/java" />
                                     <property name="src.code.dir" value="${basedir}/src/main/java" />
                                     <property name="xslt.dir" value="${basedir}/src/main/xslt" />
                                     <property name="classpath.xslt" refid="maven.compile.classpath" />
@@ -80,7 +82,8 @@
                         <configuration>
                             <target>
                                 <ant antfile="build-site.xml" target="build">
-                                    <property name="build.gen-site.dir" value="${project.build.directory}/generated-site/apt" />
+                                    <property name="build.gen-site.dir"
+                                        value="${project.build.directory}/generated-site/apt" />
                                     <property name="src.code.dir" value="${basedir}/src/main/java" />
                                     <property name="xslt.dir" value="${basedir}/src/main/xslt" />
                                     <property name="classpath.xslt" refid="maven.compile.classpath" />
@@ -121,16 +124,13 @@
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
                 </configuration>
             </plugin>
-      <!--
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-site-plugin</artifactId>
-      </plugin>
-      -->
+            <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-site-plugin</artifactId> 
+                </plugin> -->
         </plugins>
         <pluginManagement>
             <plugins>
-        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+                <!--This plugin's configuration is used to store Eclipse m2e settings 
+                    only. It has no influence on the Maven build itself. -->
                 <plugin>
                     <groupId>org.eclipse.m2e</groupId>
                     <artifactId>lifecycle-mapping</artifactId>
@@ -209,6 +209,16 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-hdfs-2.x</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-hdfs-core</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>ant</groupId>
             <artifactId>ant-trax</artifactId>
             <scope>provided</scope>
@@ -259,6 +269,21 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
     </dependencies>
 
     <reporting>
@@ -290,18 +315,10 @@
                     <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
                 </configuration>
             </plugin>
-      <!--
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <version>2.5.2</version>
-        <configuration>
-          <effort>Min</effort>
-          <threshold>Normal</threshold>
-          <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile>
-        </configuration>
-      </plugin>
-      -->
+            <!-- <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId> 
+                <version>2.5.2</version> <configuration> <effort>Min</effort> <threshold>Normal</threshold> 
+                <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile> </configuration> 
+                </plugin> -->
             <plugin>
                 <groupId>org.apache.rat</groupId>
                 <artifactId>apache-rat-plugin</artifactId>
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
index fd20465..205e0b2 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java
@@ -19,8 +19,8 @@
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild;
 import org.apache.vxquery.compiler.rewriter.rules.ConsolidateAssignAggregateRule;
+import org.apache.vxquery.compiler.rewriter.rules.ConsolidateDescandantChild;
 import org.apache.vxquery.compiler.rewriter.rules.ConvertAssignToUnnestRule;
 import org.apache.vxquery.compiler.rewriter.rules.ConvertFromAlgebricksExpressionsRule;
 import org.apache.vxquery.compiler.rewriter.rules.ConvertToAlgebricksExpressionsRule;
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
index 53011d2..74220da 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java
@@ -22,16 +22,6 @@
 import java.util.Arrays;
 
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
-import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox;
-import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
-import org.apache.vxquery.datamodel.values.ValueTag;
-import org.apache.vxquery.functions.BuiltinFunctions;
-import org.apache.vxquery.types.BuiltinTypeRegistry;
-import org.apache.vxquery.types.Quantifier;
-import org.apache.vxquery.types.SequenceType;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -46,6 +36,14 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
+import org.apache.vxquery.compiler.rewriter.rules.util.OperatorToolbox;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.types.BuiltinTypeRegistry;
+import org.apache.vxquery.types.Quantifier;
+import org.apache.vxquery.types.SequenceType;
 
 public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule {
     final ByteBufferInputStream bbis = new ByteBufferInputStream();
@@ -54,14 +52,13 @@
     final TaggedValuePointable tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
 
     /**
-     * Get the constant value for the collection. Return null for not a collection.
+     * Get the arguments for the collection and collection-with-tag. Return null for not a collection.
      *
      * @param opRef
      *            Logical operator
      * @return collection name
      */
-    protected String getCollectionName(Mutable<ILogicalOperator> opRef) {
-        VXQueryConstantValue constantValue;
+    protected String[] getCollectionName(Mutable<ILogicalOperator> opRef) {
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
@@ -82,13 +79,32 @@
             return null;
         }
         AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
-        if (!functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) {
+        if (!functionCall.getFunctionIdentifier()
+                .equals(BuiltinFunctions.FN_COLLECTION_WITH_TAG_2.getFunctionIdentifier())
+                && !functionCall.getFunctionIdentifier()
+                        .equals(BuiltinFunctions.FN_COLLECTION_1.getFunctionIdentifier())) {
             return null;
         }
 
-        ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(0).getValue();
+        // Get arguments
+        int size = functionCall.getArguments().size();
+        if (size > 0) {
+            String args[] = new String[size];
+            for (int i = 0; i < size; i++) {
+                args[i] = getArgument(functionCall, opRef, i);
+            }
+            return args;
+        }
+        return null;
+    }
+
+    private String getArgument(AbstractFunctionCallExpression functionCall, Mutable<ILogicalOperator> opRef, int pos) {
+        VXQueryConstantValue constantValue;
+        ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(pos).getValue();
         if (logicalExpression2.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
             return null;
+        } else if (logicalExpression2 == null) {
+            return null;
         }
         VariableReferenceExpression vre = (VariableReferenceExpression) logicalExpression2;
         Mutable<ILogicalOperator> opRef3 = OperatorToolbox.findProducerOf(opRef, vre.getVariableReference());
@@ -111,7 +127,6 @@
         } else {
             return null;
         }
-
         // Constant value is now in a TaggedValuePointable. Convert the value into a java String.
         tvp.set(constantValue.getValue(), 0, constantValue.getValue().length);
         String collectionName = null;
@@ -121,11 +136,12 @@
                 bbis.setByteBuffer(ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(), stringp.getStartOffset(),
                         stringp.getLength() + stringp.getStartOffset())), 0);
                 collectionName = di.readUTF();
+                return collectionName;
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
-        return collectionName;
+        return null;
     }
 
     @Override
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
index cc857a1..8ed8bb1 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceCollectionRule.java
@@ -64,9 +64,10 @@
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context;
-        String collectionName = getCollectionName(opRef);
+        String args[] = getCollectionName(opRef);
 
-        if (collectionName != null) {
+        if (args != null) {
+            String collectionName = args[0];
             // Build the new operator and update the query plan.
             int collectionId = vxqueryContext.newCollectionId();
             VXQueryCollectionDataSource ds = VXQueryCollectionDataSource.create(collectionId, collectionName,
@@ -74,6 +75,12 @@
             if (ds != null) {
                 ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
 
+                // Check if the call is for collection-with-tag
+                if (args.length == 2) {
+                    ds.setTotalDataSources(vxqueryContext.getTotalDataSources());
+                    ds.setTag(args[1]);
+                }
+
                 // Known to be true because of collection name.
                 AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
                 UnnestOperator unnest = (UnnestOperator) op;
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index 38f03a4..3b9371d 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -128,6 +128,14 @@
         <!-- Collection operator is added during the rewrite rules phase.  -->
     </function>
     
+    <!-- fn:collection-with-tag($arg1  as xs:string?, $arg2 as xs:string?) as  node()* -->
+    <function name="fn:collection-with-tag">
+        <param name="arg1" type="xs:string?"/>
+        <param name="arg2" type="xs:string?"/>
+        <return type="node()*"/>
+        <!-- CollectionWithTag operator is added during the rewrite rules phase.  -->
+    </function>
+    
     <!-- fn:compare($comparand1  as xs:string?, $comparand2 as xs:string?)  as xs:integer?  -->
     <function name="fn:compare">
         <param name="comparand1" type="xs:string?"/>
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
new file mode 100644
index 0000000..dcbfe94
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+public class HDFSFunctions {
+
+    private Configuration conf;
+    private FileSystem fs;
+    private String conf_path;
+    private Job job;
+    private InputFormat inputFormat;
+    private List<InputSplit> splits;
+    private ArrayList<ArrayList<String>> nodes;
+    private HashMap<Integer, String> schedule;
+    private final String TEMP = "java.io.tmpdir";
+    private final String dfs_path = "vxquery_splits_schedule.txt";
+    private final String filepath = System.getProperty(TEMP) + "splits_schedule.txt";
+    protected static final Logger LOGGER = Logger.getLogger(HDFSFunctions.class.getName());
+    private final Map<String, NodeControllerInfo> nodeControllerInfos;
+
+    /**
+     * Create the configuration and add the paths for core-site and hdfs-site as resources.
+     * Initialize an instance of HDFS FileSystem for this configuration.
+     * 
+     * @param nodeControllerInfos
+     * @param hdfsConf
+     */
+    public HDFSFunctions(Map<String, NodeControllerInfo> nodeControllerInfos, String hdfsConf) {
+        this.conf = new Configuration();
+        this.nodeControllerInfos = nodeControllerInfos;
+        this.conf_path = hdfsConf;
+    }
+
+    /**
+     * Create the needed objects for reading the splits of the filepath given as argument.
+     * This method should run before the scheduleSplits method.
+     * 
+     * @param filepath
+     */
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    public void setJob(String filepath, String tag) {
+        try {
+            conf.set("start_tag", "<" + tag + ">");
+            conf.set("end_tag", "</" + tag + ">");
+            job = new Job(conf, "Read from HDFS");
+            Path input = new Path(filepath);
+            FileInputFormat.addInputPath(job, input);
+            job.setInputFormatClass(XmlCollectionWithTagInputFormat.class);
+            inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+            splits = inputFormat.getSplits(job);
+        } catch (IOException | ClassNotFoundException | InterruptedException e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function.
+     * Searches in subdirectories of the home directory too.
+     * 
+     * @param filename
+     * @return
+     * @throws IOException
+     * @throws IllegalArgumentException
+     */
+    public boolean isLocatedInHDFS(String filename) throws IllegalArgumentException, IOException {
+        //search file path
+        if (fs.exists(new Path(filename))) {
+            return true;
+        }
+        return searchInDirectory(fs.getHomeDirectory(), filename) != null;
+    }
+
+    /**
+     * Searches the given directory for the file.
+     * 
+     * @param directory
+     *            to search
+     * @param filename
+     *            of file we want
+     * @return path if file exists in this directory.else return null.
+     */
+    public Path searchInDirectory(Path directory, String filename) {
+        //Search the files and folder in this Path to find the one matching the filename.
+        try {
+            RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
+            String[] parts;
+            Path path;
+            while (it.hasNext()) {
+                path = it.next().getPath();
+                parts = path.toString().split("/");
+                if (parts[parts.length - 1].equals(filename)) {
+                    return path;
+                }
+            }
+        } catch (IOException e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Read the cluster properties file and locate the HDFS_CONF variable that is the directory path for the
+     * hdfs configuration if the system environment variable HDFS_CONF is not set.
+     * 
+     * @return true if is successfully finds the Hadoop/HDFS home directory
+     */
+    private boolean locateConf() {
+        if (this.conf_path == null) {
+            //As a last resort, try getting the configuration from the system environment
+            //Some systems won't have this set.
+            this.conf_path = System.getenv("HADOOP_CONF_DIR");
+        }
+        return this.conf_path != null;
+    }
+
+    /**
+     * Upload a file/directory to HDFS.Filepath is the path in the local file system.dir is the destination path.
+     * 
+     * @param filepath
+     * @param dir
+     * @return
+     */
+    public boolean put(String filepath, String dir) {
+        if (this.fs != null) {
+            Path path = new Path(filepath);
+            Path dest = new Path(dir);
+            try {
+                if (fs.exists(dest)) {
+                    fs.delete(dest, true); //recursive delete
+                }
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(e.getMessage());
+                }
+            }
+            try {
+                fs.copyFromLocalFile(path, dest);
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(e.getMessage());
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Get instance of the HDFSfile system if it is configured correctly.
+     * Return null if there is no instance.
+     * 
+     * @return
+     */
+    public FileSystem getFileSystem() {
+        if (locateConf()) {
+            conf.addResource(new Path(this.conf_path + "/core-site.xml"));
+            conf.addResource(new Path(this.conf_path + "/hdfs-site.xml"));
+            try {
+                fs = FileSystem.get(conf);
+                return this.fs;
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(e.getMessage());
+                }
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe("Could not locate HDFS configuration folder.");
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Create a HashMap that has as key the hostname and values the splits that belong to this hostname;
+     * 
+     * @return
+     * @throws IOException
+     */
+    public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws IOException {
+        HashMap<String, ArrayList<Integer>> splits_map = new HashMap<String, ArrayList<Integer>>();
+        ArrayList<Integer> temp;
+        int i = 0;
+        String hostname;
+        for (InputSplit s : this.splits) {
+            SplitLocationInfo info[] = s.getLocationInfo();
+            hostname = info[0].getLocation();
+            if (splits_map.containsKey(hostname)) {
+                temp = splits_map.get(hostname);
+                temp.add(i);
+            } else {
+                temp = new ArrayList<Integer>();
+                temp.add(i);
+                splits_map.put(hostname, temp);
+            }
+            i++;
+        }
+
+        return splits_map;
+    }
+
+    public void scheduleSplits() throws IOException, ParserConfigurationException, SAXException {
+        schedule = new HashMap<Integer, String>();
+        ArrayList<String> empty = new ArrayList<String>();
+        HashMap<String, ArrayList<Integer>> splits_map = this.getLocationsOfSplits();
+        readNodesFromXML();
+        int count = this.splits.size();
+
+        ArrayList<Integer> splits;
+        String node;
+        for (ArrayList<String> info : this.nodes) {
+            node = info.get(1);
+            if (splits_map.containsKey(node)) {
+                splits = splits_map.get(node);
+                for (Integer split : splits) {
+                    schedule.put(split, node);
+                    count--;
+                }
+                splits_map.remove(node);
+            } else {
+                empty.add(node);
+            }
+        }
+
+        //Check if every split got assigned to a node
+        if (count != 0) {
+            ArrayList<Integer> remaining = new ArrayList<Integer>();
+            // Find remaining splits
+            for (InputSplit s : this.splits) {
+                int i = 0;
+                if (!schedule.containsKey(i)) {
+                    remaining.add(i);
+                }
+            }
+
+            if (empty.size() != 0) {
+                int node_number = 0;
+                for (int split : remaining) {
+                    if (node_number == empty.size()) {
+                        node_number = 0;
+                    }
+                    schedule.put(split, empty.get(node_number));
+                    node_number++;
+                }
+            }
+        }
+    }
+
+    /**
+     * Read the hostname and the ip address of every node from the xml cluster configuration file.
+     * Save the information inside nodes.
+     * 
+     * @throws ParserConfigurationException
+     * @throws IOException
+     * @throws SAXException
+     */
+    public void readNodesFromXML() throws ParserConfigurationException, SAXException, IOException {
+        DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder dBuilder;
+        dBuilder = dbFactory.newDocumentBuilder();
+        nodes = new ArrayList<ArrayList<String>>();
+        for (NodeControllerInfo ncInfo : nodeControllerInfos.values()) {
+            //Will this include the master node? Is that bad?
+            ArrayList<String> info = new ArrayList<String>();
+            info.add(ncInfo.getNodeId());
+            info.add(ncInfo.getNetworkAddress().getAddress());
+            nodes.add(info);
+        }
+    }
+
+    /**
+     * Writes the schedule to a temporary file, then uploads the file to the HDFS.
+     * 
+     * @throws UnsupportedEncodingException
+     * @throws FileNotFoundException
+     */
+    public void addScheduleToDistributedCache() throws FileNotFoundException, UnsupportedEncodingException {
+        PrintWriter writer;
+        writer = new PrintWriter(filepath, "UTF-8");
+        for (int split : this.schedule.keySet()) {
+            writer.write(split + "," + this.schedule.get(split));
+        }
+        writer.close();
+        // Add file to HDFS
+        this.put(filepath, dfs_path);
+    }
+
+    public RecordReader getReader() {
+
+        List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+        for (int i = 0; i < splits.size(); i++) {
+            fileSplits.add((FileSplit) splits.get(i));
+        }
+        FileSplitsFactory splitsFactory;
+        try {
+            splitsFactory = new FileSplitsFactory(fileSplits);
+            List<FileSplit> inputSplits = splitsFactory.getSplits();
+            ContextFactory ctxFactory = new ContextFactory();
+            int size = inputSplits.size();
+            for (int i = 0; i < size; i++) {
+                /**
+                 * read the split
+                 */
+                TaskAttemptContext context;
+                try {
+                    context = ctxFactory.createContext(job.getConfiguration(), i);
+                    RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
+                    reader.initialize(inputSplits.get(i), context);
+                    return reader;
+                } catch (IOException | InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe(e.getMessage());
+                    }
+                }
+            }
+        } catch (HyracksDataException e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @return schedule.
+     */
+    public HashMap<Integer, String> getSchedule() {
+        return this.schedule;
+    }
+
+    /**
+     * Return the splits belonging to this node for the existing schedule.
+     * 
+     * @param node
+     * @return
+     */
+    public ArrayList<Integer> getScheduleForNode(String node) {
+        ArrayList<Integer> node_schedule = new ArrayList<Integer>();
+        for (int split : this.schedule.keySet()) {
+            if (node.equals(this.schedule.get(split))) {
+                node_schedule.add(split);
+            }
+        }
+        return node_schedule;
+    }
+
+    public List<InputSplit> getSplits() {
+        return this.splits;
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public InputFormat getinputFormat() {
+        return this.inputFormat;
+    }
+
+    public Document convertStringToDocument(String xmlStr) {
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder builder;
+        try {
+            builder = factory.newDocumentBuilder();
+            Document doc = builder.parse(new InputSource(new StringReader(xmlStr)));
+            return doc;
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe(e.getMessage());
+            }
+        }
+        return null;
+    }
+}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
new file mode 100644
index 0000000..1d053b6
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlCollectionWithTagInputFormat extends TextInputFormat {
+
+    public static String STARTING_TAG;
+    public static String ENDING_TAG;
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+        try {
+            STARTING_TAG = context.getConfiguration().get("start_tag");
+            ENDING_TAG = context.getConfiguration().get("end_tag");
+            return new XmlRecordReader((FileSplit) split, context.getConfiguration());
+        } catch (IOException ioe) {
+            return null;
+        }
+    }
+
+    /**
+     * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
+     * by the end tag
+     */
+    public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
+
+        private final byte[] end_tag;
+        private final byte[] start_tag;
+        private final long start;
+        private final long end;
+        private final FSDataInputStream fsin;
+        private final DataOutputBuffer buffer = new DataOutputBuffer();
+        private LongWritable currentKey;
+        private Text currentValue;
+        BlockLocation[] blocks;
+        public static byte[] nl = "\n".getBytes();
+
+        public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
+            end_tag = ENDING_TAG.getBytes(Charsets.UTF_8);
+            start_tag = STARTING_TAG.getBytes(Charsets.UTF_8);
+
+            // open the file and seek to the start of the split
+            start = split.getStart();
+            // set the end of the file
+            end = start + split.getLength();
+            Path file = split.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            FileStatus fStatus = fs.getFileStatus(file);
+            blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
+            // seek the start of file
+            fsin = fs.open(split.getPath());
+            fsin.seek(start);
+        }
+
+        /**
+         * Get next block item
+         * 
+         * @param key
+         * @param value
+         * @return
+         * @throws IOException
+         */
+        private boolean next(LongWritable key, Text value) throws IOException {
+            if (fsin.getPos() < end) {
+                try {
+                    if (readBlock(true)) {
+                        key.set(fsin.getPos());
+                        value.set(buffer.getData(), 0, buffer.getLength());
+                        return true;
+                    }
+                } finally {
+                    buffer.reset();
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+            Closeables.close(fsin, true);
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+            return (fsin.getPos() - start) / (float) (end - start);
+        }
+
+        /**
+         * Read the block from start till end and after that until you find a closing tag
+         * 
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readBlock(boolean withinBlock) throws IOException {
+            boolean read = false;
+
+            while (true) {
+                if (fsin.getPos() < end) {
+                    if (readUntilMatch(start_tag, false)) {
+                        buffer.write(start_tag);
+                        readUntilMatch(end_tag, true);
+                        read = true;
+                    }
+                } else {
+                    return read;
+                }
+            }
+        }
+
+        /**
+         * Read from block(s) until you reach the end of file or find a matching bytes with match[]
+         * 
+         * @param match
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
+            int i = 0;
+            while (true) {
+                int b = fsin.read();
+                // end of file:
+                if (b == -1) {
+                    return false;
+                }
+                // save to buffer:
+                if (withinBlock) {
+                    buffer.write(b);
+                }
+
+                // check if we're matching:
+                if (b == match[i]) {
+                    i++;
+                    if (i >= match.length) {
+                        return true;
+                    }
+                } else {
+                    i = 0;
+                }
+                // see if we've passed the stop point:
+                if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+                    return false;
+                }
+            }
+        }
+
+        private int nextBlock() throws IOException {
+            long pos = fsin.getPos();
+            long block_length;
+            for (int i = 0; i < blocks.length; i++) {
+                block_length = blocks[i].getOffset() + blocks[i].getLength();
+                if (pos == block_length) {
+                    return i + 1;
+                }
+            }
+            return 0;
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException, InterruptedException {
+            return currentKey;
+        }
+
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException {
+            return currentValue;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            currentKey = new LongWritable();
+            currentValue = new Text();
+            return next(currentKey, currentValue);
+        }
+    }
+}
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index 62d1ca7..3c2d6aa 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -37,6 +37,7 @@
     private String[] collectionPartitions;
     private final List<Integer> childSeq;
     private int totalDataSources;
+    private String tag;
 
     private final Object[] types;
 
@@ -60,6 +61,7 @@
             }
         };
         this.childSeq = new ArrayList<Integer>();
+        this.tag = null;
     }
 
     public int getTotalDataSources() {
@@ -77,7 +79,7 @@
     public String[] getPartitions() {
         return collectionPartitions;
     }
-    
+
     public void setPartitions(String[] collectionPartitions) {
         this.collectionPartitions = collectionPartitions;
     }
@@ -86,6 +88,14 @@
         return collectionPartitions.length;
     }
 
+    public String getTag() {
+        return this.tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
     @Override
     public String getId() {
         return collectionName;
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index d5966b8..b8dca63 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -16,15 +16,37 @@
  */
 package org.apache.vxquery.metadata;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameFieldAppender;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -38,10 +60,14 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.hdfs.ContextFactory;
+import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
 import org.apache.vxquery.context.DynamicContext;
+import org.apache.vxquery.hdfs2.HDFSFunctions;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.XMLParser;
+import org.xml.sax.SAXException;
 
 public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -50,15 +76,23 @@
     private String[] collectionPartitions;
     private List<Integer> childSeq;
     protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
+    private HDFSFunctions hdfs;
+    private String tag;
+    private final String START_TAG = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n";
+    private final String hdfsConf;
+    private final Map<String, NodeControllerInfo> nodeControllerInfos;
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds,
-            RecordDescriptor rDesc) {
+            RecordDescriptor rDesc, String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
         super(spec, 1, 1);
         collectionPartitions = ds.getPartitions();
         dataSourceId = (short) ds.getDataSourceId();
         totalDataSources = (short) ds.getTotalDataSources();
         childSeq = ds.getChildSeq();
         recordDescriptors[0] = rDesc;
+        this.tag = ds.getTag();
+        this.hdfsConf = hdfsConf;
+        this.nodeControllerInfos = nodeControllerInfos;
     }
 
     @Override
@@ -83,31 +117,155 @@
             public void open() throws HyracksDataException {
                 appender.reset(frame, true);
                 writer.open();
+                hdfs = new HDFSFunctions(nodeControllerInfos, hdfsConf);
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 fta.reset(buffer);
                 String collectionModifiedName = collectionName.replace("${nodeId}", nodeId);
-                File collectionDirectory = new File(collectionModifiedName);
-
-                // Go through each tuple.
-                if (collectionDirectory.isDirectory()) {
-                    for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
-                        @SuppressWarnings("unchecked")
-                        Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
-                                TrueFileFilter.INSTANCE);
-                        while (it.hasNext()) {
-                            File xmlDocument = it.next();
-                            if (LOGGER.isLoggable(Level.FINE)) {
-                                LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath());
+                if (!collectionModifiedName.contains("hdfs:/")) {
+                    File collectionDirectory = new File(collectionModifiedName);
+                    //check if directory is in the local file system
+                    if (collectionDirectory.exists()) {
+                        // Go through each tuple.
+                        if (collectionDirectory.isDirectory()) {
+                            for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+                                Iterator<File> it = FileUtils.iterateFiles(collectionDirectory,
+                                        new VXQueryIOFileFilter(), TrueFileFilter.INSTANCE);
+                                while (it.hasNext()) {
+                                    File xmlDocument = it.next();
+                                    if (LOGGER.isLoggable(Level.FINE)) {
+                                        LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath());
+                                    }
+                                    parser.parseElements(xmlDocument, writer, tupleIndex);
+                                }
                             }
-                            parser.parseElements(xmlDocument, writer, tupleIndex);
+                        } else {
+                            throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":"
+                                    + collectionDirectory.getAbsolutePath() + ") passed to collection.");
                         }
                     }
                 } else {
-                    throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":"
-                            + collectionDirectory.getAbsolutePath() + ") passed to collection.");
+                    // Else check in HDFS file system
+                    // Get instance of the HDFS filesystem
+                    FileSystem fs = hdfs.getFileSystem();
+                    if (fs != null) {
+                        collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", "");
+                        Path directory = new Path(collectionModifiedName);
+                        Path xmlDocument;
+                        if (tag != null) {
+                            hdfs.setJob(directory.toString(), tag);
+                            tag = "<" + tag + ">";
+                            Job job = hdfs.getJob();
+                            InputFormat inputFormat = hdfs.getinputFormat();
+                            try {
+                                hdfs.scheduleSplits();
+                                ArrayList<Integer> schedule = hdfs
+                                        .getScheduleForNode(InetAddress.getLocalHost().getHostAddress());
+                                List<InputSplit> splits = hdfs.getSplits();
+                                List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+                                for (int i : schedule) {
+                                    fileSplits.add((FileSplit) splits.get(i));
+                                }
+                                FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits);
+                                List<FileSplit> inputSplits = splitsFactory.getSplits();
+                                ContextFactory ctxFactory = new ContextFactory();
+                                int size = inputSplits.size();
+                                InputStream stream;
+                                String value;
+                                RecordReader reader;
+                                TaskAttemptContext context;
+                                for (int i = 0; i < size; i++) {
+                                    //read split
+                                    context = ctxFactory.createContext(job.getConfiguration(), i);
+                                    try {
+                                        reader = inputFormat.createRecordReader(inputSplits.get(i), context);
+                                        reader.initialize(inputSplits.get(i), context);
+                                        while (reader.nextKeyValue()) {
+                                            value = reader.getCurrentValue().toString();
+                                            //Split value if it contains more than one item with the tag
+                                            if (StringUtils.countMatches(value, tag) > 1) {
+                                                String items[] = value.split(tag);
+                                                for (String item : items) {
+                                                    if (item.length() > 0) {
+                                                        item = START_TAG + tag + item;
+                                                        stream = new ByteArrayInputStream(
+                                                                item.getBytes(StandardCharsets.UTF_8));
+                                                        parser.parseHDFSElements(stream, writer, fta, i);
+                                                    }
+                                                }
+                                            } else {
+                                                value = START_TAG + value;
+                                                //create an input stream to the file currently reading and send it to parser
+                                                stream = new ByteArrayInputStream(
+                                                        value.getBytes(StandardCharsets.UTF_8));
+                                                parser.parseHDFSElements(stream, writer, fta, i);
+                                            }
+                                        }
+
+                                    } catch (InterruptedException e) {
+                                        if (LOGGER.isLoggable(Level.SEVERE)) {
+                                            LOGGER.severe(e.getMessage());
+                                        }
+                                    }
+                                }
+
+                            } catch (IOException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            } catch (ParserConfigurationException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            } catch (SAXException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            }
+                        } else {
+                            try {
+                                //check if the path exists and is a directory
+                                if (fs.exists(directory) && fs.isDirectory(directory)) {
+                                    for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
+                                        //read every file in the directory
+                                        RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
+                                        while (it.hasNext()) {
+                                            xmlDocument = it.next().getPath();
+                                            if (fs.isFile(xmlDocument)) {
+                                                if (LOGGER.isLoggable(Level.FINE)) {
+                                                    LOGGER.fine(
+                                                            "Starting to read XML document: " + xmlDocument.getName());
+                                                }
+                                                //create an input stream to the file currently reading and send it to parser
+                                                InputStream in = fs.open(xmlDocument).getWrappedStream();
+                                                parser.parseHDFSElements(in, writer, fta, tupleIndex);
+                                            }
+                                        }
+                                    }
+                                } else {
+                                    throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":"
+                                            + directory + ") passed to collection.");
+                                }
+                            } catch (FileNotFoundException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            } catch (IOException e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe(e.getMessage());
+                                }
+                            }
+                        }
+                        try {
+                            fs.close();
+                        } catch (IOException e) {
+                            if (LOGGER.isLoggable(Level.SEVERE)) {
+                                LOGGER.severe(e.getMessage());
+                            }
+                        }
+                    }
                 }
             }
 
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
index 238f6d3..820c365 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryMetadataProvider.java
@@ -22,8 +22,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.vxquery.context.StaticContext;
-
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -45,6 +43,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -52,16 +51,22 @@
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import org.apache.vxquery.context.StaticContext;
 
 public class VXQueryMetadataProvider implements IMetadataProvider<String, String> {
     private final String[] nodeList;
     private final Map<String, File> sourceFileMap;
     private final StaticContext staticCtx;
+    private final String hdfsConf;
+    private final Map<String, NodeControllerInfo> nodeControllerInfos;
 
-    public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx) {
+    public VXQueryMetadataProvider(String[] nodeList, Map<String, File> sourceFileMap, StaticContext staticCtx,
+            String hdfsConf, Map<String, NodeControllerInfo> nodeControllerInfos) {
         this.nodeList = nodeList;
         this.sourceFileMap = sourceFileMap;
         this.staticCtx = staticCtx;
+        this.hdfsConf = hdfsConf;
+        this.nodeControllerInfos = nodeControllerInfos;
     }
 
     @Override
@@ -82,7 +87,7 @@
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         VXQueryCollectionDataSource ds = (VXQueryCollectionDataSource) dataSource;
         if (sourceFileMap != null) {
             final int len = ds.getPartitions().length;
@@ -95,7 +100,8 @@
             ds.setPartitions(collectionPartitions);
         }
         RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[opSchema.getSize()]);
-        IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc);
+        IOperatorDescriptor scanner = new VXQueryCollectionOperatorDescriptor(jobSpec, ds, rDesc, this.hdfsConf,
+                this.nodeControllerInfos);
 
         AlgebricksPartitionConstraint constraint = getClusterLocations(nodeList, ds.getPartitionCount());
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, constraint);
@@ -129,7 +135,7 @@
     @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
@@ -155,7 +161,7 @@
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
index d8c4e68..d394bbc 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java
@@ -19,10 +19,21 @@
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import org.apache.vxquery.context.DynamicContext;
 import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
 import org.apache.vxquery.datamodel.accessors.TypedPointables;
@@ -33,6 +44,7 @@
 import org.apache.vxquery.datamodel.values.ValueTag;
 import org.apache.vxquery.exceptions.ErrorCode;
 import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.hdfs2.HDFSFunctions;
 import org.apache.vxquery.runtime.functions.arithmetic.AbstractArithmeticOperation;
 import org.apache.vxquery.runtime.functions.cast.CastToDoubleOperation;
 import org.apache.vxquery.runtime.functions.comparison.AbstractValueComparisonOperation;
@@ -42,14 +54,6 @@
 import org.apache.vxquery.types.BuiltinTypeRegistry;
 import org.apache.vxquery.xmlparser.XMLParser;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
 public class FunctionHelper {
 
     public static void arithmeticOperation(AbstractArithmeticOperation aOp, DynamicContext dCtx,
@@ -1213,8 +1217,33 @@
         } catch (SystemException e) {
             throw new HyracksDataException(e);
         }
-        File file = new File(fName);
-        parser.parseDocument(file, abvs);
+        if (!fName.contains("hdfs:/")) {
+            File file = new File(fName);
+            if (file.exists()) {
+                parser.parseDocument(file, abvs);
+            }
+        }
+        //else check in HDFS file system
+        else {
+            fName = fName.replaceAll("hdfs:/", "");
+            HDFSFunctions hdfs = new HDFSFunctions(null, null);
+            FileSystem fs = hdfs.getFileSystem();
+            if (fs != null) {
+                Path xmlDocument = new Path(fName);
+                try {
+                    if (fs.exists(xmlDocument)) {
+                        InputStream in = fs.open(xmlDocument).getWrappedStream();
+                        parser.parseHDFSDocument(in, abvs);
+                    }
+                } catch (FileNotFoundException e) {
+                    // TODO Auto-generated catch block
+                    System.err.println(e);
+                } catch (IOException e) {
+                    // TODO Auto-generated catch block
+                    System.err.println(e);
+                }
+            }
+        }
     }
 
     public static boolean transformThenCompareMinMaxTaggedValues(AbstractValueComparisonOperation aOp,
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
index 77d500c..a62a26c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java
@@ -19,6 +19,7 @@
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.ArrayList;
@@ -28,6 +29,7 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.vxquery.context.StaticContext;
 import org.apache.vxquery.exceptions.VXQueryFileNotFoundException;
 import org.apache.vxquery.exceptions.VXQueryParseException;
@@ -98,8 +100,7 @@
         }
     }
 
-    public void parseElements(File file, IFrameWriter writer, int tupleIndex)
-            throws HyracksDataException {
+    public void parseElements(File file, IFrameWriter writer, int tupleIndex) throws HyracksDataException {
         try {
             Reader input;
             if (bufferSize > 0) {
@@ -126,4 +127,48 @@
         }
     }
 
+    public void parseHDFSElements(InputStream inputStream, IFrameWriter writer, FrameTupleAccessor fta, int tupleIndex)
+            throws IOException {
+        try {
+            Reader input;
+            if (bufferSize > 0) {
+                input = new BufferedReader(new InputStreamReader(inputStream), bufferSize);
+            } else {
+                input = new InputStreamReader(inputStream);
+            }
+            in.setCharacterStream(input);
+            handler.setupElementWriter(writer, tupleIndex);
+            parser.parse(in);
+            input.close();
+        } catch (IOException e) {
+            HyracksDataException hde = new HyracksDataException(e);
+            hde.setNodeId(nodeId);
+            throw hde;
+        } catch (SAXException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    public void parseHDFSDocument(InputStream inputStream, ArrayBackedValueStorage abvs) throws HyracksDataException {
+        try {
+            Reader input;
+            if (bufferSize > 0) {
+                input = new BufferedReader(new InputStreamReader(inputStream), bufferSize);
+            } else {
+                input = new InputStreamReader(inputStream);
+            }
+            in.setCharacterStream(input);
+            parser.parse(in);
+            handler.writeDocument(abvs);
+            input.close();
+        } catch (IOException e) {
+            HyracksDataException hde = new HyracksDataException(e);
+            hde.setNodeId(nodeId);
+            throw hde;
+        } catch (SAXException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
 }
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
index e42caba..8a044ea 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlquery/query/XMLQueryCompiler.java
@@ -17,28 +17,7 @@
 import java.io.Reader;
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.vxquery.compiler.CompilerControlBlock;
-import org.apache.vxquery.compiler.algebricks.VXQueryBinaryBooleanInspectorFactory;
-import org.apache.vxquery.compiler.algebricks.VXQueryBinaryIntegerInspectorFactory;
-import org.apache.vxquery.compiler.algebricks.VXQueryComparatorFactoryProvider;
-import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
-import org.apache.vxquery.compiler.algebricks.VXQueryExpressionRuntimeProvider;
-import org.apache.vxquery.compiler.algebricks.VXQueryNullWriterFactory;
-import org.apache.vxquery.compiler.algebricks.VXQueryPrinterFactoryProvider;
-import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor;
-import org.apache.vxquery.compiler.rewriter.RewriteRuleset;
-import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
-import org.apache.vxquery.exceptions.ErrorCode;
-import org.apache.vxquery.exceptions.SystemException;
-import org.apache.vxquery.metadata.VXQueryMetadataProvider;
-import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFactoryProvider;
-import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFamilyProvider;
-import org.apache.vxquery.types.BuiltinTypeRegistry;
-import org.apache.vxquery.types.Quantifier;
-import org.apache.vxquery.types.SequenceType;
-import org.apache.vxquery.xmlquery.ast.ModuleNode;
-import org.apache.vxquery.xmlquery.translator.XMLQueryTranslator;
+import java.util.Map;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -67,16 +46,40 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
+import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.vxquery.compiler.CompilerControlBlock;
+import org.apache.vxquery.compiler.algebricks.VXQueryBinaryBooleanInspectorFactory;
+import org.apache.vxquery.compiler.algebricks.VXQueryBinaryIntegerInspectorFactory;
+import org.apache.vxquery.compiler.algebricks.VXQueryComparatorFactoryProvider;
+import org.apache.vxquery.compiler.algebricks.VXQueryConstantValue;
+import org.apache.vxquery.compiler.algebricks.VXQueryExpressionRuntimeProvider;
+import org.apache.vxquery.compiler.algebricks.VXQueryNullWriterFactory;
+import org.apache.vxquery.compiler.algebricks.VXQueryPrinterFactoryProvider;
+import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor;
+import org.apache.vxquery.compiler.rewriter.RewriteRuleset;
+import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.metadata.VXQueryMetadataProvider;
+import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFactoryProvider;
+import org.apache.vxquery.runtime.provider.VXQueryBinaryHashFunctionFamilyProvider;
+import org.apache.vxquery.types.BuiltinTypeRegistry;
+import org.apache.vxquery.types.Quantifier;
+import org.apache.vxquery.types.SequenceType;
+import org.apache.vxquery.xmlquery.ast.ModuleNode;
+import org.apache.vxquery.xmlquery.translator.XMLQueryTranslator;
 
 public class XMLQueryCompiler {
     private final XQueryCompilationListener listener;
 
     private final ICompilerFactory cFactory;
 
+    private final String hdfsConf;
+
     private LogicalOperatorPrettyPrintVisitor pprinter;
 
     private ModuleNode moduleNode;
@@ -87,17 +90,27 @@
 
     private int frameSize;
 
+    private Map<String, NodeControllerInfo> nodeControllerInfos;
+
     private String[] nodeList;
 
-    public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize) {
-        this(listener, nodeList, frameSize, -1, -1, -1);
+    public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos,
+            int frameSize, String hdfsConf) {
+        this(listener, nodeControllerInfos, frameSize, -1, -1, -1, hdfsConf);
     }
 
-    public XMLQueryCompiler(XQueryCompilationListener listener, String[] nodeList, int frameSize,
-            int availableProcessors, long joinHashSize, long maximumDataSize) {
+    public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos,
+            int frameSize) {
+        this(listener, nodeControllerInfos, frameSize, -1, -1, -1, "");
+    }
+
+    public XMLQueryCompiler(XQueryCompilationListener listener, Map<String, NodeControllerInfo> nodeControllerInfos,
+            int frameSize, int availableProcessors, long joinHashSize, long maximumDataSize, String hdfsConf) {
         this.listener = listener == null ? NoopXQueryCompilationListener.INSTANCE : listener;
         this.frameSize = frameSize;
-        this.nodeList = nodeList;
+        this.nodeControllerInfos = nodeControllerInfos;
+        setNodeList();
+        this.hdfsConf = hdfsConf;
         HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
                 new IOptimizationContextFactory() {
                     @Override
@@ -116,12 +129,12 @@
             builder.getPhysicalOptimizationConfig().setMaxFramesHybridHash((int) (joinHashSize / this.frameSize));
         }
         if (maximumDataSize > 0) {
-            builder.getPhysicalOptimizationConfig().setMaxFramesLeftInputHybridHash(
-                    (int) (maximumDataSize / this.frameSize));
+            builder.getPhysicalOptimizationConfig()
+                    .setMaxFramesLeftInputHybridHash((int) (maximumDataSize / this.frameSize));
         }
 
-        builder.getPhysicalOptimizationConfig().setMaxFramesLeftInputHybridHash(
-                (int) (60L * 1024 * 1048576 / this.frameSize));
+        builder.getPhysicalOptimizationConfig()
+                .setMaxFramesLeftInputHybridHash((int) (60L * 1024 * 1048576 / this.frameSize));
 
         builder.setLogicalRewrites(buildDefaultLogicalRewrites());
         builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
@@ -192,15 +205,26 @@
         cFactory = builder.create();
     }
 
+    /**
+     * Set Configuration of node controllers as array of Strings.
+     */
+    private void setNodeList() {
+        nodeList = new String[nodeControllerInfos.size()];
+        int index = 0;
+        for (String node : nodeControllerInfos.keySet()) {
+            nodeList[index++] = node;
+        }
+    }
+
     public void compile(String name, Reader query, CompilerControlBlock ccb, int optimizationLevel)
             throws SystemException {
         moduleNode = XMLQueryParser.parse(name, query);
         listener.notifyParseResult(moduleNode);
         module = new XMLQueryTranslator(ccb).translateModule(moduleNode);
-        pprinter = new LogicalOperatorPrettyPrintVisitor(new VXQueryLogicalExpressionPrettyPrintVisitor(
-                module.getModuleContext()));
+        pprinter = new LogicalOperatorPrettyPrintVisitor(
+                new VXQueryLogicalExpressionPrettyPrintVisitor(module.getModuleContext()));
         VXQueryMetadataProvider mdProvider = new VXQueryMetadataProvider(nodeList, ccb.getSourceFileMap(),
-                module.getModuleContext());
+                module.getModuleContext(), this.hdfsConf, nodeControllerInfos);
         compiler = cFactory.createCompiler(module.getBody(), mdProvider, 0);
         listener.notifyTranslationResult(module);
         XMLQueryTypeChecker.typeCheckModule(module);
diff --git a/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java b/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java
index 165b330..13ca921 100644
--- a/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java
+++ b/vxquery-core/src/test/java/org/apache/vxquery/xmlquery/query/SimpleXQueryTest.java
@@ -20,16 +20,19 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
-import junit.framework.Assert;
-
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.vxquery.compiler.CompilerControlBlock;
 import org.apache.vxquery.context.RootStaticContextImpl;
 import org.apache.vxquery.context.StaticContextImpl;
 import org.junit.Test;
 
-import org.apache.hyracks.api.dataset.ResultSetId;
+import junit.framework.Assert;
 
 public class SimpleXQueryTest {
     @Test
@@ -96,8 +99,8 @@
 
     private static String gunzip(String dir, String filename) {
         try {
-            GZIPInputStream in = new GZIPInputStream(new BufferedInputStream(new FileInputStream(new File(dir
-                    + filename + ".gz"))));
+            GZIPInputStream in = new GZIPInputStream(
+                    new BufferedInputStream(new FileInputStream(new File(dir + filename + ".gz"))));
             File temp = File.createTempFile("vxquery", filename);
             temp.deleteOnExit();
             FileOutputStream out = new FileOutputStream(temp);
@@ -133,7 +136,11 @@
     }
 
     private static void runTestInternal(String testName, String query) throws Exception {
-        XMLQueryCompiler compiler = new XMLQueryCompiler(null, new String[] { "nc1" }, 65536);
+
+        Map<String, NodeControllerInfo> nodeControllerInfos = new HashMap<String, NodeControllerInfo>();
+        nodeControllerInfos.put("nc1", new NodeControllerInfo("nc1", null, new NetworkAddress("127.0.0.1", 0), null));
+
+        XMLQueryCompiler compiler = new XMLQueryCompiler(null, nodeControllerInfos, 65536);
         CompilerControlBlock ccb = new CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
                 new ResultSetId(System.nanoTime()), null);
         compiler.compile(testName, new StringReader(query), ccb, Integer.MAX_VALUE);
diff --git a/vxquery-server/src/main/resources/conf/cluster.properties b/vxquery-server/src/main/resources/conf/cluster.properties
index fd015d4..6339fd9 100644
--- a/vxquery-server/src/main/resources/conf/cluster.properties
+++ b/vxquery-server/src/main/resources/conf/cluster.properties
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,4 +51,4 @@
 NCJAVA_OPTS="-server -Xmx7G -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/testing_logging.properties"
 # debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
 # Yourkit option: -agentpath:/tools/yjp-2014-build-14114/bin/linux-x86-64/libyjpagent.so=port=20001"
-# Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling
+# Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling
\ No newline at end of file
diff --git a/vxquery-xtest/pom.xml b/vxquery-xtest/pom.xml
index 0c563e7..4cb7c60 100644
--- a/vxquery-xtest/pom.xml
+++ b/vxquery-xtest/pom.xml
@@ -161,6 +161,21 @@
         </dependency>
         
         <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-hdfs-2.x</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.mortbay.jetty</groupId>
             <artifactId>jetty</artifactId>
             <scope>compile</scope>
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java
new file mode 100644
index 0000000..0720baa
--- /dev/null
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/MiniDFS.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.xtest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.mapred.JobConf;
+
+public class MiniDFS {
+
+    private MiniDFSCluster dfsCluster;
+
+    public void miniDFS() {
+
+    }
+
+    public void startHDFS() throws IOException {
+
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        JobConf conf = new JobConf();
+        String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+        int numDataNodes = 1;
+        int nameNodePort = 40000;
+
+        // cleanup artifacts created on the local file system
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        MiniDFSCluster.Builder build = new MiniDFSCluster.Builder(conf);
+        build.nameNodePort(nameNodePort);
+        build.nameNodeHttpPort(nameNodePort + 34);
+        build.numDataNodes(numDataNodes);
+        build.checkExitOnShutdown(true);
+        build.startupOption(StartupOption.REGULAR);
+        build.format(true);
+        build.waitSafeMode(true);
+        dfsCluster = build.build();
+
+        FileSystem dfs = FileSystem.get(conf);
+        String DATA_PATH = "src/test/resources/TestSources/ghcnd";
+        Path src = new Path(DATA_PATH);
+        dfs.mkdirs(new Path("/tmp"));
+        Path dest = new Path("/tmp/vxquery-hdfs-test");
+        dfs.copyFromLocalFile(src, dest);
+        if (dfs.exists(dest)) {
+            System.err.println("Test files copied to HDFS successfully");
+        }
+    }
+
+    public void shutdownHDFS() {
+        System.err.println("Tests completed.Shutting down HDFS");
+        dfsCluster.shutdown();
+    }
+}
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java
index 18db5c1..5b6ddff 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestRunner.java
@@ -18,13 +18,16 @@
 import java.io.FileInputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.net.InetAddress;
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -63,6 +66,7 @@
     private NodeControllerService nc1;
     private IHyracksClientConnection hcc;
     private IHyracksDataset hds;
+    private final String publicAddress = InetAddress.getLocalHost().getHostAddress();
 
     public TestRunner(XTestOptions opts) throws Exception {
         this.opts = opts;
@@ -70,9 +74,9 @@
 
     public void open() throws Exception {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetIpAddress = publicAddress;
         ccConfig.clientNetPort = 39000;
-        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetIpAddress = publicAddress;
         ccConfig.clusterNetPort = 39001;
         ccConfig.profileDumpPeriod = 10000;
         File outDir = new File("target/ClusterController");
@@ -87,9 +91,9 @@
         NCConfig ncConfig1 = new NCConfig();
         ncConfig1.ccHost = "localhost";
         ncConfig1.ccPort = 39001;
-        ncConfig1.clusterNetIPAddress = "127.0.0.1";
-        ncConfig1.dataIPAddress = "127.0.0.1";
-        ncConfig1.resultIPAddress = "127.0.0.1";
+        ncConfig1.clusterNetIPAddress = publicAddress;
+        ncConfig1.dataIPAddress = publicAddress;
+        ncConfig1.resultIPAddress = publicAddress;
         ncConfig1.nodeId = "nc1";
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -115,7 +119,14 @@
 
                 VXQueryCompilationListener listener = new VXQueryCompilationListener(opts.showAST, opts.showTET,
                         opts.showOET, opts.showRP);
-                XMLQueryCompiler compiler = new XMLQueryCompiler(listener, new String[] { "nc1" }, opts.frameSize);
+
+                Map<String, NodeControllerInfo> nodeControllerInfos = null;
+                if (hcc != null) {
+                    nodeControllerInfos = hcc.getNodeControllerInfos();
+                }
+
+                XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, opts.frameSize,
+                        opts.hdfsConf);
                 Reader in = new InputStreamReader(new FileInputStream(testCase.getXQueryFile()), "UTF-8");
                 CompilerControlBlock ccb = new CompilerControlBlock(
                         new StaticContextImpl(RootStaticContextImpl.INSTANCE),
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
index 854cbf8..496b74a 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
@@ -76,4 +76,7 @@
 
     @Option(name = "-showresult", usage = "Show query result.")
     boolean showResult;
+
+    @Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files")
+    String hdfsConf;
 }
diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
index 12a91a9..5411215 100644
--- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
+++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
@@ -37,6 +37,7 @@
         opts.threads = 1;
         opts.showQuery = true;
         opts.showResult = true;
+        opts.hdfsConf = "src/test/resources/hadoop/conf";
         return opts;
     }
 
diff --git a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java
index 3704d07..4d0ddc0 100644
--- a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java
+++ b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/VXQueryTest.java
@@ -17,15 +17,19 @@
 package org.apache.vxquery.xtest;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 public class VXQueryTest extends AbstractXQueryTest {
+    private static MiniDFS dfs;
 
     private static String VXQUERY_CATALOG = StringUtils.join(new String[] { "src", "test", "resources",
             "VXQueryCatalog.xml" }, File.separator);
@@ -52,4 +56,19 @@
         return getOptions();
     }
 
+    @BeforeClass
+    public static void setupHDFS() {
+        dfs = new MiniDFS();
+        try {
+            dfs.startHDFS();
+        } catch (IOException e) {
+            System.err.println(e);
+        }
+    }
+
+    @AfterClass
+    public static void shutdownHDFS() {
+        dfs.shutdownHDFS();
+    }
+
 }
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/avgHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/avgHDFS.txt
new file mode 100644
index 0000000..7ef6ffe
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/avgHDFS.txt
@@ -0,0 +1 @@
+12.5
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/countHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/countHDFS.txt
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/countHDFS.txt
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxHDFS.txt
new file mode 100644
index 0000000..dc7b54a
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxHDFS.txt
@@ -0,0 +1 @@
+33
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxvalueHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxvalueHDFS.txt
new file mode 100644
index 0000000..e37d32a
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/maxvalueHDFS.txt
@@ -0,0 +1 @@
+1000
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/minHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/minHDFS.txt
new file mode 100644
index 0000000..ea1acb6
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/minHDFS.txt
@@ -0,0 +1 @@
+11.25
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/sumHDFS.txt b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/sumHDFS.txt
new file mode 100644
index 0000000..2b82dfe
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/ExpectedTestResults/HDFS/Aggregate/sumHDFS.txt
@@ -0,0 +1 @@
+60
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/avgHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/avgHDFS.xq
new file mode 100644
index 0000000..3214b97
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/avgHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   
+     http://www.apache.org/licenses/LICENSE-2.0
+   
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the average minimum temperature.                                            :)
+fn:avg(
+    let $collection := "hdfs://tmp/vxquery-hdfs-test"
+    for $r in collection($collection)/dataCollection/data
+    where $r/dataType eq "TMIN" 
+    return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/countHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/countHDFS.xq
new file mode 100644
index 0000000..7940b03
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/countHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   
+     http://www.apache.org/licenses/LICENSE-2.0
+   
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the number of wind sensor readings.                                            :)
+fn:count(
+    let $collection := "hdfs://tmp/vxquery-hdfs-test"
+    for $r in collection($collection)/dataCollection/data
+    where $r/dataType eq "AWND" 
+    return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxHDFS.xq
new file mode 100644
index 0000000..0db1980
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   
+     http://www.apache.org/licenses/LICENSE-2.0
+   
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the highest max temperature.                                            :)
+fn:max(
+    let $collection := "hdfs://tmp/vxquery-hdfs-test"
+    for $r in collection($collection)/dataCollection/data
+    where $r/dataType eq "TMAX" 
+    return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
new file mode 100644
index 0000000..9d04916
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/maxvalueHDFS.xq
@@ -0,0 +1,23 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the max value.                                            :)
+fn:max(
+    for $r in collection-with-tag("hdfs://tmp/vxquery-hdfs-test/half_1/quarter_1/sensors", "data")/data
+    return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/minHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/minHDFS.xq
new file mode 100644
index 0000000..869e222
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/minHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   
+     http://www.apache.org/licenses/LICENSE-2.0
+   
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the lowest min temperature.                                            :)
+fn:min(
+    let $collection := "hdfs://tmp/vxquery-hdfs-test"
+    for $r in collection($collection)/dataCollection/data
+    where $r/dataType eq "TMIN" 
+    return $r/value
+)
diff --git a/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/sumHDFS.xq b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/sumHDFS.xq
new file mode 100644
index 0000000..1fdf743
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/Queries/XQuery/HDFS/Aggregate/sumHDFS.xq
@@ -0,0 +1,25 @@
+(: Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+   
+     http://www.apache.org/licenses/LICENSE-2.0
+   
+   Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License. :)
+
+(: XQuery Aggregate Query :)
+(: Find the total precipitation.                                            :)
+fn:sum(
+    let $collection := "hdfs://tmp/vxquery-hdfs-test"
+    for $r in collection($collection)/dataCollection/data
+    where $r/dataType eq "PRCP" 
+    return $r/value
+)
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/VXQueryCatalog.xml b/vxquery-xtest/src/test/resources/VXQueryCatalog.xml
index 6f7acee..f75ce49 100644
--- a/vxquery-xtest/src/test/resources/VXQueryCatalog.xml
+++ b/vxquery-xtest/src/test/resources/VXQueryCatalog.xml
@@ -40,6 +40,8 @@
 <!ENTITY SingleQuery SYSTEM "cat/SingleQuery.xml">
 <!ENTITY SingleAlternateQuery SYSTEM "cat/SingleAlternateQuery.xml">
 
+<!ENTITY HDFSAggregateQueries SYSTEM "cat/HDFSAggregateQueries.xml">
+
 ]>
 <test-suite xmlns="http://www.w3.org/2005/02/query-test-XQTSCatalog"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -193,4 +195,17 @@
          &GhcndRecordsPartition4Queries;
       </test-group>
    </test-group>
+   <test-group name="HDFSAggregateQueries" featureOwner="Efi Kaltirimidou">
+      <GroupInfo>
+         <title>Aggregate Partition Queries in HDFS</title>
+         <description/>
+      </GroupInfo>
+      <test-group name="CollectionReadFromHDFSAggregateTests" featureOwner="Efi Kaltirimidou">
+         <GroupInfo>
+            <title>Aggregate HDFS Execution Tests</title>
+            <description/>
+         </GroupInfo>
+        &HDFSAggregateQueries;
+      </test-group>
+   </test-group>
 </test-suite>
\ No newline at end of file
diff --git a/vxquery-xtest/src/test/resources/cat/HDFSAggregateQueries.xml b/vxquery-xtest/src/test/resources/cat/HDFSAggregateQueries.xml
new file mode 100644
index 0000000..4f925a0
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/cat/HDFSAggregateQueries.xml
@@ -0,0 +1,53 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<test-group xmlns="http://www.w3.org/2005/02/query-test-XQTSCatalog" name="AggregateQueries" featureOwner="VXQuery">
+   <GroupInfo>
+      <title>HDFS Aggregate</title>
+      <description/>
+   </GroupInfo>
+   <test-case name="hdfs-aggregate-avg" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+      <description>Count records in HDFS returned for q02 from the weather benchmark.</description>
+      <query name="avgHDFS" date="2015-06-11"/>
+      <output-file compare="Text">avgHDFS.txt</output-file>
+   </test-case>
+   <test-case name="hdfs-aggregate-count" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+      <description>Count records in HDFS returned for q02 from the weather benchmark.</description>
+      <query name="countHDFS" date="2015-06-11"/>
+      <output-file compare="Text">countHDFS.txt</output-file>
+   </test-case>
+   <test-case name="hdfs-aggregate-min" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+      <description>Count records in HDFS returned for q05 from the weather benchmark.</description>
+      <query name="minHDFS" date="2015-06-11"/>
+      <output-file compare="Text">minHDFS.txt</output-file>
+   </test-case>
+   <test-case name="hdfs-aggregate-max" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+      <description>Count records in HDFS returned for q07 from the weather benchmark.</description>
+      <query name="maxHDFS" date="2015-06-11"/>
+      <output-file compare="Text">maxHDFS.txt</output-file>
+   </test-case>
+   <test-case name="hdfs-aggregate-sum" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+      <description>Count records in HDFS returned for q03 from the weather benchmark.</description>
+      <query name="sumHDFS" date="2015-06-11"/>
+      <output-file compare="Text">sumHDFS.txt</output-file>
+   </test-case>
+   <test-case name="hdfs-aggregate-max-value" FilePath="HDFS/Aggregate/" Creator="Efi Kaltirimidou">
+      <description>Count records in HDFS returned for q03 from the weather benchmark.</description>
+      <query name="maxvalueHDFS" date="2015-10-17"/>
+      <output-file compare="Text">maxvalueHDFS.txt</output-file>
+   </test-case>
+</test-group>
diff --git a/vxquery-xtest/src/test/resources/hadoop/conf/core-site.xml b/vxquery-xtest/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..b1f576c
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+    <name>fs.default.name</name>
+    <value>hdfs://localhost:40000</value>
+</property>
+<property>
+    <name>hadoop.tmp.dir</name>
+    <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/vxquery-xtest/src/test/resources/hadoop/conf/hdfs-site.xml b/vxquery-xtest/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..6b6604d
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+   <name>dfs.replication</name>
+   <value>1</value>
+</property>
+
+<property>
+	<name>dfs.block.size</name>
+	<value>1048576</value>
+</property>
+
+</configuration>
diff --git a/vxquery-xtest/src/test/resources/hadoop/conf/mapred-site.xml b/vxquery-xtest/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..e15ec74
--- /dev/null
+++ b/vxquery-xtest/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>128</value>
+	</property>
+
+</configuration>