Merge branch 'master' into feature/fasterJoin

Conflicts:
	CHANGES.md
diff --git a/CHANGES.md b/CHANGES.md
index 1b8361c..f5cb59a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@
  * [METAMODEL-1139] - Employed Java 8 functional types (java.util.function) in favor of (now deprecated) Ref, Action, Func. 
  * [METAMODEL-1140] - Allowed SalesforceDataContext without a security token.
  * [METAMODEL-1141] - Added RFC 4180 compliant CSV parsing.
+ * [METAMODEL-1144] - Optimized evaluation of conditional client-side JOIN statements.
  * [METAMODEL-1145] - Fixed bug with modelling JDBC table relationships when there are multiple keys involved in the relationship.
 
 ### Apache MetaModel 4.6.0
diff --git a/README.md b/README.md
index e5cf17a..3156d74 100644
--- a/README.md
+++ b/README.md
@@ -1,40 +1,40 @@
-## Apache MetaModel

-

-MetaModel is a data access framework, providing a common interface for exploration and querying of different types of datastores.

-

-<div>

-<img src="http://metamodel.apache.org/img/logo.png" style="float: right; margin-left: 20px;" alt="MetaModel logo" />

-</div>

-

-### Mailing lists

-

- * Developer list:  dev@metamodel.apache.org

- * User list:  user@metamodel.apache.org

- * Commits list:    commits@metamodel.apache.org

-

-### Website

-

-http://metamodel.apache.org/

-

-### Documentation

-

-Please check out our [wiki for user documentation](https://cwiki.apache.org/confluence/display/METAMODEL).

-

-### Building the code

-

-MetaModel uses maven as it's build tool. Code can be built with:

-

-```

-mvn clean install

-```

-

-### Running the integration tests

-

- 1. Copy the file 'example-metamodel-integrationtest-configuration.properties' to your user home.

- 2. Remove the 'example-' prefix from its filename

- 3. Modify the file to enable properties of the integration tests that you're interested in.

- 4. Re-run "mvn clean install".

-

-### Contributing

-

+## Apache MetaModel
+
+MetaModel is a data access framework, providing a common interface for exploration and querying of different types of datastores.
+
+<div>
+<img src="http://metamodel.apache.org/img/logo.png" style="float: right; margin-left: 20px;" alt="MetaModel logo" />
+</div>
+
+### Mailing lists
+
+ * Developer list:  dev@metamodel.apache.org
+ * User list:  user@metamodel.apache.org
+ * Commits list:    commits@metamodel.apache.org
+
+### Website
+
+http://metamodel.apache.org/
+
+### Documentation
+
+Please check out our [wiki for user documentation](https://cwiki.apache.org/confluence/display/METAMODEL).
+
+### Building the code
+
+MetaModel uses maven as it's build tool. Code can be built with:
+
+```
+mvn clean install
+```
+
+### Running the integration tests
+
+ 1. Copy the file 'example-metamodel-integrationtest-configuration.properties' to your user home.
+ 2. Remove the 'example-' prefix from its filename
+ 3. Modify the file to enable properties of the integration tests that you're interested in.
+ 4. Re-run "mvn clean install".
+
+### Contributing
+
 Please see [CONTRIBUTE.md](CONTRIBUTE.md)
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/metamodel/MetaModelHelper.java b/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
index 09d47bc..a2681da 100644
--- a/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
+++ b/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
@@ -18,16 +18,9 @@
  */
 package org.apache.metamodel;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 import org.apache.metamodel.data.CachingDataSetHeader;
 import org.apache.metamodel.data.DataSet;
@@ -177,72 +170,96 @@
         return getCarthesianProduct(fromDataSets, new FilterItem[0]);
     }
 
+    public static DataSet getCarthesianProduct(DataSet[] fromDataSets, FilterItem... filterItems) {
+        return getCarthesianProduct(fromDataSets, Arrays.asList(filterItems));
+    }
+
     public static DataSet getCarthesianProduct(DataSet[] fromDataSets, Iterable<FilterItem> whereItems) {
+        assert (fromDataSets.length > 0);
         // First check if carthesian product is even nescesary
         if (fromDataSets.length == 1) {
             return getFiltered(fromDataSets[0], whereItems);
         }
+        // do a nested loop join, no matter what
+        Iterator<DataSet> dsIter = Arrays.asList(fromDataSets).iterator();
 
-        List<SelectItem> selectItems = new ArrayList<SelectItem>();
-        for (DataSet dataSet : fromDataSets) {
-            for (int i = 0; i < dataSet.getSelectItems().length; i++) {
-                SelectItem item = dataSet.getSelectItems()[i];
-                selectItems.add(item);
-            }
+        DataSet joined = dsIter.next();
+
+        while (dsIter.hasNext()) {
+            joined = nestedLoopJoin(dsIter.next(), joined, (whereItems));
+
         }
 
-        int selectItemOffset = 0;
-        List<Object[]> data = new ArrayList<Object[]>();
-        for (int fromDataSetIndex = 0; fromDataSetIndex < fromDataSets.length; fromDataSetIndex++) {
-            DataSet fromDataSet = fromDataSets[fromDataSetIndex];
-            SelectItem[] fromSelectItems = fromDataSet.getSelectItems();
-            if (fromDataSetIndex == 0) {
-                while (fromDataSet.next()) {
-                    Object[] values = fromDataSet.getRow().getValues();
-                    Object[] row = new Object[selectItems.size()];
-                    System.arraycopy(values, 0, row, selectItemOffset, values.length);
-                    data.add(row);
-                }
-                fromDataSet.close();
-            } else {
-                List<Object[]> fromDataRows = new ArrayList<Object[]>();
-                while (fromDataSet.next()) {
-                    fromDataRows.add(fromDataSet.getRow().getValues());
-                }
-                fromDataSet.close();
-                for (int i = 0; i < data.size(); i = i + fromDataRows.size()) {
-                    Object[] originalRow = data.get(i);
-                    data.remove(i);
-                    for (int j = 0; j < fromDataRows.size(); j++) {
-                        Object[] newRow = fromDataRows.get(j);
-                        System.arraycopy(newRow, 0, originalRow, selectItemOffset, newRow.length);
-                        data.add(i + j, originalRow.clone());
-                    }
-                }
-            }
-            selectItemOffset += fromSelectItems.length;
-        }
+        return joined;
 
-        if (data.isEmpty()) {
-            return new EmptyDataSet(selectItems);
-        }
-
-        final DataSetHeader header = new CachingDataSetHeader(selectItems);
-        final List<Row> rows = new ArrayList<Row>(data.size());
-        for (Object[] objects : data) {
-            rows.add(new DefaultRow(header, objects, null));
-        }
-
-        DataSet result = new InMemoryDataSet(header, rows);
-        if (whereItems != null) {
-            DataSet filteredResult = getFiltered(result, whereItems);
-            result = filteredResult;
-        }
-        return result;
     }
 
-    public static DataSet getCarthesianProduct(DataSet[] fromDataSets, FilterItem... filterItems) {
-        return getCarthesianProduct(fromDataSets, Arrays.asList(filterItems));
+    /**
+     * Executes a simple nested loop join. The innerLoopDs will be copied in an
+     * in-memory dataset.
+     *
+     */
+    public static InMemoryDataSet nestedLoopJoin(DataSet innerLoopDs, DataSet outerLoopDs,
+            Iterable<FilterItem> filtersIterable) {
+
+        List<FilterItem> filters = new ArrayList<>();
+        for (FilterItem fi : filtersIterable) {
+            filters.add(fi);
+        }
+        List<Row> innerRows = innerLoopDs.toRows();
+
+        List<SelectItem> allItems = new ArrayList<>(Arrays.asList(outerLoopDs.getSelectItems()));
+        allItems.addAll(Arrays.asList(innerLoopDs.getSelectItems()));
+
+        Set<FilterItem> applicableFilters = applicableFilters(filters, allItems);
+
+        DataSetHeader jointHeader = new CachingDataSetHeader(allItems);
+
+        List<Row> resultRows = new ArrayList<>();
+        for (Row outerRow : outerLoopDs) {
+            for (Row innerRow : innerRows) {
+
+                Object[] joinedRowObjects = new Object[outerRow.getValues().length + innerRow.getValues().length];
+
+                System.arraycopy(outerRow.getValues(), 0, joinedRowObjects, 0, outerRow.getValues().length);
+                System.arraycopy(innerRow.getValues(), 0, joinedRowObjects, outerRow.getValues().length, innerRow
+                        .getValues().length);
+
+                Row joinedRow = new DefaultRow(jointHeader, joinedRowObjects);
+
+                if (applicableFilters.isEmpty() || applicableFilters.stream().allMatch(fi -> fi.accept(joinedRow))) {
+                    resultRows.add(joinedRow);
+                }
+            }
+        }
+
+        return new InMemoryDataSet(jointHeader, resultRows);
+    }
+
+    /**
+     * Filters the FilterItems such that only the FilterItems are returned,
+     * which contain SelectItems that are contained in selectItemList
+     * 
+     * @param filters
+     * @param selectItemList
+     * @return
+     */
+    private static Set<FilterItem> applicableFilters(Collection<FilterItem> filters,
+            Collection<SelectItem> selectItemList) {
+
+        Set<SelectItem> items = new HashSet<SelectItem>(selectItemList);
+
+        return filters.stream().filter(fi -> {
+            Collection<SelectItem> fiSelectItems = new ArrayList<>();
+            fiSelectItems.add(fi.getSelectItem());
+            Object operand = fi.getOperand();
+            if (operand instanceof SelectItem) {
+                fiSelectItems.add((SelectItem) operand);
+            }
+
+            return items.containsAll(fiSelectItems);
+
+        }).collect(Collectors.toSet());
     }
 
     public static DataSet getFiltered(DataSet dataSet, Iterable<FilterItem> filterItems) {
diff --git a/core/src/test/java/org/apache/metamodel/MetaModelHelperTest.java b/core/src/test/java/org/apache/metamodel/MetaModelHelperTest.java
index 540aa95..a84cef1 100644
--- a/core/src/test/java/org/apache/metamodel/MetaModelHelperTest.java
+++ b/core/src/test/java/org/apache/metamodel/MetaModelHelperTest.java
@@ -115,21 +115,19 @@
 
     public void testSimpleCarthesianProduct() throws Exception {
         DataSet dataSet = MetaModelHelper.getCarthesianProduct(createDataSet1(), createDataSet2());
+        List<String> results = new ArrayList<String>();
 
+        while (dataSet.next()) {
+            results.add(dataSet.getRow().toString());
+        }
         assertEquals(2, dataSet.getSelectItems().length);
-        assertTrue(dataSet.next());
-        assertEquals("Row[values=[f, b]]", dataSet.getRow().toString());
-        assertTrue(dataSet.next());
-        assertEquals("Row[values=[f, a]]", dataSet.getRow().toString());
-        assertTrue(dataSet.next());
-        assertTrue(dataSet.next());
-        assertTrue(dataSet.next());
-        assertTrue(dataSet.next());
-        assertTrue(dataSet.next());
-        assertTrue(dataSet.next());
-        assertTrue(dataSet.next());
-        assertEquals("Row[values=[o, r]]", dataSet.getRow().toString());
-        assertFalse(dataSet.next());
+        assertEquals(9, results.size());
+        assertTrue(results.contains("Row[values=[f, b]]"));
+        assertTrue(results.contains("Row[values=[f, a]]"));
+        assertTrue(results.contains("Row[values=[f, r]]"));
+        assertTrue(results.contains("Row[values=[o, b]]"));
+        assertTrue(results.contains("Row[values=[o, a]]"));
+        assertTrue(results.contains("Row[values=[o, r]]"));
     }
 
     public void testTripleCarthesianProduct() throws Exception {
@@ -187,8 +185,8 @@
         data1.add(new Object[] { "f" });
         data1.add(new Object[] { "o" });
         data1.add(new Object[] { "o" });
-        DataSet dataSet1 = createDataSet(
-                new SelectItem[] { new SelectItem(new MutableColumn("foo", ColumnType.VARCHAR)) }, data1);
+        DataSet dataSet1 = createDataSet(new SelectItem[] { new SelectItem(new MutableColumn("foo",
+                ColumnType.VARCHAR)) }, data1);
         return dataSet1;
     }
 
@@ -205,8 +203,8 @@
         List<Object[]> data3 = new ArrayList<Object[]>();
         data3.add(new Object[] { "w00p", true });
         data3.add(new Object[] { "yippie", false });
-        DataSet dataSet3 = createDataSet(new SelectItem[] { new SelectItem("expression", "e"),
-                new SelectItem("webish?", "w") }, data3);
+        DataSet dataSet3 = createDataSet(new SelectItem[] { new SelectItem("expression", "e"), new SelectItem("webish?",
+                "w") }, data3);
         return dataSet3;
     }
 
@@ -216,6 +214,41 @@
         return dataSet4;
     }
 
+    private int bigDataSetSize = 3000;
+
+    /**
+     * 
+     * @return a big dataset, mocking an employee table
+     */
+    private DataSet createDataSet5() {
+        List<Object[]> data5 = new ArrayList<Object[]>();
+
+        for (int i = 0; i < bigDataSetSize; i++) {
+            data5.add(new Object[] { i, "Person_" + i, bigDataSetSize - (i + 1) });
+        }
+
+        DataSet dataSet5 = createDataSet(new SelectItem[] { new SelectItem(new MutableColumn("nr", ColumnType.BIGINT)),
+                new SelectItem(new MutableColumn("name", ColumnType.STRING)), new SelectItem(new MutableColumn("dnr",
+                        ColumnType.BIGINT)) }, data5);
+        return dataSet5;
+    }
+
+    /**
+     * 
+     * @return a big dataset, mocking an department table
+     */
+    private DataSet createDataSet6() {
+        List<Object[]> data6 = new ArrayList<Object[]>();
+
+        for (int i = 0; i < bigDataSetSize; i++) {
+            data6.add(new Object[] { i, "Department_" + i });
+        }
+
+        DataSet dataSet6 = createDataSet(new SelectItem[] { new SelectItem(new MutableColumn("nr", ColumnType.BIGINT)),
+                new SelectItem(new MutableColumn("name", ColumnType.STRING)), }, data6);
+        return dataSet6;
+    }
+
     public void testGetTables() throws Exception {
         MutableTable table1 = new MutableTable("table1");
         MutableTable table2 = new MutableTable("table2");
@@ -324,4 +357,22 @@
         assertEquals("Row[values=[1, 2, null]]", joinedDs.getRow().toString());
         assertFalse(joinedDs.next());
     }
+
+    public void testCarthesianProductScalability() {
+
+        DataSet employees = createDataSet5();
+        DataSet departmens = createDataSet6();
+
+        FilterItem fi = new FilterItem(employees.getSelectItems()[2], OperatorType.EQUALS_TO, departmens
+                .getSelectItems()[0]);
+
+        DataSet joined = MetaModelHelper.getCarthesianProduct(new DataSet[] { employees, departmens }, fi);
+        int count = 0;
+        while (joined.next()) {
+            count++;
+        }
+
+        assertTrue(count == bigDataSetSize);
+
+    }
 }
diff --git a/jdbc/src/test/java/org/apache/metamodel/jdbc/MultiJDBCDataSetTest.java b/jdbc/src/test/java/org/apache/metamodel/jdbc/MultiJDBCDataSetTest.java
new file mode 100644
index 0000000..0b60f95
--- /dev/null
+++ b/jdbc/src/test/java/org/apache/metamodel/jdbc/MultiJDBCDataSetTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.jdbc;
+
+import com.google.common.base.Stopwatch;
+import org.apache.metamodel.CompositeDataContext;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.create.CreateTable;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.drop.DropTable;
+import org.apache.metamodel.insert.InsertInto;
+import org.apache.metamodel.schema.ColumnType;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A test case using two simple h2 in memory databases for executing single
+ * query over both databases.
+ */
+public class MultiJDBCDataSetTest {
+
+    public static final String DRIVER_CLASS = "org.h2.Driver";
+    public static final String EMP_URL_MEMORY_DATABASE = "jdbc:h2:mem:emp";
+    public static final String DEP_URL_MEMORY_DATABASE = "jdbc:h2:mem:dep";
+
+    private Connection dep_conn;
+    private UpdateableDataContext dep_dcon;
+
+    private Connection emp_conn;
+    private UpdateableDataContext emp_dcon;
+
+    private int employeeSize = 10000;
+    private int departmentSize = 1000;
+    int employeesPerDepartment = employeeSize / departmentSize;
+
+    private static final Logger logger = LoggerFactory.getLogger(MultiJDBCDataSetTest.class);
+
+    @Before
+    public void setup() throws Exception {
+        Class.forName(DRIVER_CLASS);
+        emp_conn = DriverManager.getConnection(EMP_URL_MEMORY_DATABASE);
+        dep_conn = DriverManager.getConnection(DEP_URL_MEMORY_DATABASE);
+
+        emp_dcon = new JdbcDataContext(emp_conn);
+        dep_dcon = new JdbcDataContext(dep_conn);
+
+        emp_dcon.executeUpdate(new CreateTable(emp_dcon.getDefaultSchema(), "employee").withColumn("id").ofType(
+                ColumnType.INTEGER).asPrimaryKey().withColumn("name").ofType(ColumnType.VARCHAR).ofSize(200).withColumn(
+                        "dep_id").ofType(ColumnType.INTEGER));
+
+        for (int i = 0; i < employeeSize; i++) {
+            emp_dcon.executeUpdate(new InsertInto(emp_dcon.getDefaultSchema().getTableByName("employee")).value("id", i)
+                    .value("name", "emp" + i).value("dep_id", i % departmentSize));
+        }
+
+        dep_dcon.executeUpdate(new CreateTable(dep_dcon.getDefaultSchema(), "department").withColumn("id").ofType(
+                ColumnType.INTEGER).asPrimaryKey().withColumn("name").ofType(ColumnType.VARCHAR).ofSize(200));
+
+        for (int i = 0; i < departmentSize; i++) {
+            dep_dcon.executeUpdate(new InsertInto(dep_dcon.getDefaultSchema().getTableByName("department")).value("id",
+                    i).value("name", "dep" + i));
+        }
+
+    }
+
+    @After
+    public void tearDown() {
+        dep_dcon.executeUpdate(new DropTable("department"));
+        emp_dcon.executeUpdate(new DropTable("employee"));
+    }
+
+    @Test
+    public void testJoin() {
+        Stopwatch duration = Stopwatch.createStarted();
+        CompositeDataContext compDcon = new CompositeDataContext(this.emp_dcon, this.dep_dcon);
+
+        DataSet ds = compDcon.query().from("employee").innerJoin("department").on("dep_id", "id").selectAll().execute();
+        int rowCount = 0;
+        while (ds.next()) {
+            Row row = ds.getRow();
+            Assert.assertNotNull(row);
+            rowCount++;
+        }
+        duration.stop();
+        logger.info("Test duration was {} ms", duration.elapsed(TimeUnit.MILLISECONDS));
+
+        Assert.assertEquals(employeeSize, rowCount);
+
+    }
+
+    @Test
+    public void testSelectiveJoin() {
+        Stopwatch duration = Stopwatch.createStarted();
+        CompositeDataContext compDcon = new CompositeDataContext(this.emp_dcon, this.dep_dcon);
+
+        DataSet ds = compDcon.query().from("employee").innerJoin("department").on("dep_id", "id").selectAll().where(
+                compDcon.getTableByQualifiedLabel("department").getColumnByName("id")).eq(1).execute();
+        int rowCount = 0;
+        while (ds.next()) {
+            Row row = ds.getRow();
+            Assert.assertNotNull(row);
+            rowCount++;
+        }
+        duration.stop();
+        logger.info("Test duration was {} ms", duration.elapsed(TimeUnit.MILLISECONDS));
+
+        Assert.assertEquals(employeesPerDepartment, rowCount);
+    }
+
+}