TAJO-2177: In BroadcastJoinRule, the total volume of broadcast tables should be checked before stages are merged.
diff --git a/CHANGES b/CHANGES
index d517499..6f69000 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,9 @@
BUG FIXES
+ TAJO-2177: In BroadcastJoinRule, the total volume of broadcast tables should be
+ checked before stages are merged. (jihoon)
+
TAJO-2164: SequenceFile print wrong values with TextSerializerDeserializer.
(jaehwa)
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java
index 70d25b4..5a1d311 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -55,7 +55,8 @@
}
private TpchTestBase() throws IOException {
- names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier", "empty_orders"};
+ names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier",
+ "small_supplier", "empty_orders"};
paths = new String[names.length];
for (int i = 0; i < names.length; i++) {
nameMap.put(names[i], i);
diff --git a/tajo-cluster-tests/src/test/resources/tpch/small_supplier.tbl b/tajo-cluster-tests/src/test/resources/tpch/small_supplier.tbl
new file mode 100644
index 0000000..041d3fe
--- /dev/null
+++ b/tajo-cluster-tests/src/test/resources/tpch/small_supplier.tbl
@@ -0,0 +1,6 @@
+2|Supplier#000000002|89eJ5ksX3ImxJQBvxObC,|5|15-679-861-2259|4032.68| slyly bold instructions. idle dependen|
+3|Supplier#000000003|q1,G3Pj6OjIuUYfUoH18BFTKP5aU9bEV3|1|11-383-516-1199|4192.40|blithely silent requests after the express dependencies are sl|
+4|Supplier#000000004|Bk7ah4CK8SYQTepEmvMkkgMwg|15|25-843-787-7479|4641.08|riously even requests above the exp|
+|\N|\N||\N||for null test|
+|\N|\N||\N||for null test2|
+|\N|\N||\N||for null test3|
\ No newline at end of file
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 1ef1726..b7eb953 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -63,7 +63,7 @@
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname,
"" + 5);
testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
- 1024 * 1024 + "");
+ "" + 1);
testingCluster.setAllTajoDaemonConfValue(
ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
index 9c7ac58..1812ae0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestMultipleJoinTypes.java
@@ -21,6 +21,7 @@
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.NamedTest;
import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.conf.TajoConf.ConfVars;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -103,4 +104,24 @@
public final void testInnerAndOuterWithEmpty() throws Exception {
runSimpleTests();
}
+
+ @Test
+ @Option(withExplain = true, withExplainGlobal = true, parameterized = true, sort = true)
+ @SimpleTest(queries = {
+ @QuerySpec("select * from \n" +
+ "(\n" +
+ " select * from (\n" +
+ " select l_orderkey, count(*) as cnt from lineitem group by l_orderkey having count(*) > 0\n" +
+ " ) t\n" +
+ ") l \n" +
+ "join (\n" +
+ " select * from small_supplier\n" +
+ ") a on l.l_orderkey = a.s_suppkey\n" +
+ "left outer join (\n" +
+ " select * from nation\n" +
+ ") b on l.l_orderkey = b.n_nationkey\n")
+ })
+ public final void testExceedBroadcastThreshold() throws Exception {
+ runSimpleTests();
+ }
}
diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Hash.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Hash.plan
new file mode 100644
index 0000000..1038213
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Hash.plan
@@ -0,0 +1,166 @@
+explain
+-------------------------------
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000007)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000007
+ |-eb_0000000000000_0000_000006
+ |-eb_0000000000000_0000_000001
+ |-eb_0000000000000_0000_000005
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000005
+2: eb_0000000000000_0000_000001
+3: eb_0000000000000_0000_000006
+4: eb_0000000000000_0000_000007
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000005 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+
+TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ PROJECTION(12)
+ => Targets: default.nation.n_nationkey (INT4), default.nation.n_name (TEXT), default.nation.n_regionkey (INT4), default.nation.n_comment (TEXT)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 1 => 6 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+GROUP_BY(20)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), ?count_6 (INT8)
+ => out schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000006 [ROOT]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+[q_0000000000000_0000] 1 => 6 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+[Enforcers]
+ 0: type=Broadcast, tables=default.small_supplier
+
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ SCAN(25) on eb_0000000000000_0000_000005
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ PROJECTION(8)
+ => Targets: default.small_supplier.s_suppkey (INT4), default.small_supplier.s_name (TEXT), default.small_supplier.s_address (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_comment (TEXT)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ PROJECTION(5)
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ PROJECTION(3)
+ => Targets: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count(?count_6 (INT8)))
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ SCAN(21) on eb_0000000000000_0000_000001
+ => out schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000007 [TERMINAL]
+=======================================================
diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Hash_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Hash_NoBroadcast.plan
new file mode 100644
index 0000000..2ec81d8
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Hash_NoBroadcast.plan
@@ -0,0 +1,209 @@
+explain
+-------------------------------
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000007)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000007
+ |-eb_0000000000000_0000_000006
+ |-eb_0000000000000_0000_000005
+ |-eb_0000000000000_0000_000004
+ |-eb_0000000000000_0000_000003
+ |-eb_0000000000000_0000_000002
+ |-eb_0000000000000_0000_000001
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000001
+2: eb_0000000000000_0000_000002
+3: eb_0000000000000_0000_000003
+4: eb_0000000000000_0000_000004
+5: eb_0000000000000_0000_000005
+6: eb_0000000000000_0000_000006
+7: eb_0000000000000_0000_000007
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 1 => 2 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+GROUP_BY(20)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), ?count_6 (INT8)
+ => out schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000002 [INTERMEDIATE]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 1 => 2 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+[Outgoing]
+[q_0000000000000_0000] 2 => 4 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+
+TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ PROJECTION(5)
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ PROJECTION(3)
+ => Targets: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count(?count_6 (INT8)))
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ SCAN(21) on eb_0000000000000_0000_000001
+ => out schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000003 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 3 => 4 (type=HASH_SHUFFLE, key=default.a.s_suppkey (INT4), num=32)
+
+TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ PROJECTION(8)
+ => Targets: default.small_supplier.s_suppkey (INT4), default.small_supplier.s_name (TEXT), default.small_supplier.s_address (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_comment (TEXT)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000004 [INTERMEDIATE]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 2 => 4 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+[q_0000000000000_0000] 3 => 4 (type=HASH_SHUFFLE, key=default.a.s_suppkey (INT4), num=32)
+
+[Outgoing]
+[q_0000000000000_0000] 4 => 6 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+
+JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ SCAN(23) on eb_0000000000000_0000_000003
+ => out schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ => in schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ SCAN(22) on eb_0000000000000_0000_000002
+ => out schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+ => in schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000005 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+
+TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ PROJECTION(12)
+ => Targets: default.nation.n_nationkey (INT4), default.nation.n_name (TEXT), default.nation.n_regionkey (INT4), default.nation.n_comment (TEXT)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000006 [ROOT]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 4 => 6 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ SCAN(25) on eb_0000000000000_0000_000005
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ SCAN(24) on eb_0000000000000_0000_000004
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000007 [TERMINAL]
+=======================================================
diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Sort.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Sort.plan
new file mode 100644
index 0000000..1038213
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Sort.plan
@@ -0,0 +1,166 @@
+explain
+-------------------------------
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000007)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000007
+ |-eb_0000000000000_0000_000006
+ |-eb_0000000000000_0000_000001
+ |-eb_0000000000000_0000_000005
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000005
+2: eb_0000000000000_0000_000001
+3: eb_0000000000000_0000_000006
+4: eb_0000000000000_0000_000007
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000005 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+
+TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ PROJECTION(12)
+ => Targets: default.nation.n_nationkey (INT4), default.nation.n_name (TEXT), default.nation.n_regionkey (INT4), default.nation.n_comment (TEXT)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 1 => 6 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+GROUP_BY(20)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), ?count_6 (INT8)
+ => out schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000006 [ROOT]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+[q_0000000000000_0000] 1 => 6 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+[Enforcers]
+ 0: type=Broadcast, tables=default.small_supplier
+
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ SCAN(25) on eb_0000000000000_0000_000005
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ PROJECTION(8)
+ => Targets: default.small_supplier.s_suppkey (INT4), default.small_supplier.s_name (TEXT), default.small_supplier.s_address (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_comment (TEXT)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ PROJECTION(5)
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ PROJECTION(3)
+ => Targets: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count(?count_6 (INT8)))
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ SCAN(21) on eb_0000000000000_0000_000001
+ => out schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000007 [TERMINAL]
+=======================================================
diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Sort_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Sort_NoBroadcast.plan
new file mode 100644
index 0000000..2ec81d8
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.Sort_NoBroadcast.plan
@@ -0,0 +1,209 @@
+explain
+-------------------------------
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000007)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000007
+ |-eb_0000000000000_0000_000006
+ |-eb_0000000000000_0000_000005
+ |-eb_0000000000000_0000_000004
+ |-eb_0000000000000_0000_000003
+ |-eb_0000000000000_0000_000002
+ |-eb_0000000000000_0000_000001
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000001
+2: eb_0000000000000_0000_000002
+3: eb_0000000000000_0000_000003
+4: eb_0000000000000_0000_000004
+5: eb_0000000000000_0000_000005
+6: eb_0000000000000_0000_000006
+7: eb_0000000000000_0000_000007
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 1 => 2 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+GROUP_BY(20)(l_orderkey)
+ => exprs: (count())
+ => target list: default.lineitem.l_orderkey (INT4), ?count_6 (INT8)
+ => out schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema:{(1) default.lineitem.l_orderkey (INT4)}
+ SCAN(0) on default.lineitem
+ => target list: default.lineitem.l_orderkey (INT4)
+ => out schema: {(1) default.lineitem.l_orderkey (INT4)}
+ => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000002 [INTERMEDIATE]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 1 => 2 (type=HASH_SHUFFLE, key=default.lineitem.l_orderkey (INT4), num=32)
+
+[Outgoing]
+[q_0000000000000_0000] 2 => 4 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+
+TABLE_SUBQUERY(6) as default.l
+ => Targets: default.l.l_orderkey (INT4), default.l.cnt (INT8)
+ => out schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+ => in schema: {(2) default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ PROJECTION(5)
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ => in schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ TABLE_SUBQUERY(4) as default.t
+ => Targets: default.t.l_orderkey (INT4), default.t.cnt (INT8)
+ => out schema: {(2) default.t.l_orderkey (INT4), default.t.cnt (INT8)}
+ => in schema: {(2) default.t.cnt (INT8), default.t.l_orderkey (INT4)}
+ PROJECTION(3)
+ => Targets: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema: {(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ HAVING(2) (cnt (INT8) > 0)
+ GROUP_BY(1)(l_orderkey)
+ => exprs: (count(?count_6 (INT8)))
+ => target list: default.lineitem.l_orderkey (INT4), cnt (INT8)
+ => out schema:{(2) cnt (INT8), default.lineitem.l_orderkey (INT4)}
+ => in schema:{(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ SCAN(21) on eb_0000000000000_0000_000001
+ => out schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+ => in schema: {(2) default.lineitem.l_orderkey (INT4), ?count_6 (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000003 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 3 => 4 (type=HASH_SHUFFLE, key=default.a.s_suppkey (INT4), num=32)
+
+TABLE_SUBQUERY(9) as default.a
+ => Targets: default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)
+ => out schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ => in schema: {(7) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4)}
+ PROJECTION(8)
+ => Targets: default.small_supplier.s_suppkey (INT4), default.small_supplier.s_name (TEXT), default.small_supplier.s_address (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_comment (TEXT)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ SCAN(7) on default.small_supplier
+ => target list: default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)
+ => out schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+ => in schema: {(7) default.small_supplier.s_acctbal (FLOAT8), default.small_supplier.s_address (TEXT), default.small_supplier.s_comment (TEXT), default.small_supplier.s_name (TEXT), default.small_supplier.s_nationkey (INT4), default.small_supplier.s_phone (TEXT), default.small_supplier.s_suppkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000004 [INTERMEDIATE]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 2 => 4 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+[q_0000000000000_0000] 3 => 4 (type=HASH_SHUFFLE, key=default.a.s_suppkey (INT4), num=32)
+
+[Outgoing]
+[q_0000000000000_0000] 4 => 6 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+
+JOIN(17)(INNER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.a.s_suppkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ SCAN(23) on eb_0000000000000_0000_000003
+ => out schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ => in schema: {(7) default.a.s_suppkey (INT4), default.a.s_name (TEXT), default.a.s_address (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_acctbal (FLOAT8), default.a.s_comment (TEXT)}
+ SCAN(22) on eb_0000000000000_0000_000002
+ => out schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+ => in schema: {(2) default.l.l_orderkey (INT4), default.l.cnt (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000005 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+
+TABLE_SUBQUERY(13) as default.b
+ => Targets: default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4)}
+ PROJECTION(12)
+ => Targets: default.nation.n_nationkey (INT4), default.nation.n_name (TEXT), default.nation.n_regionkey (INT4), default.nation.n_comment (TEXT)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ SCAN(11) on default.nation
+ => target list: default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)
+ => out schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+ => in schema: {(4) default.nation.n_comment (TEXT), default.nation.n_name (TEXT), default.nation.n_nationkey (INT4), default.nation.n_regionkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000006 [ROOT]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 4 => 6 (type=HASH_SHUFFLE, key=default.l.l_orderkey (INT4), num=32)
+[q_0000000000000_0000] 5 => 6 (type=HASH_SHUFFLE, key=default.b.n_nationkey (INT4), num=32)
+
+JOIN(18)(LEFT_OUTER)
+ => Join Cond: default.l.l_orderkey (INT4) = default.b.n_nationkey (INT4)
+ => target list: default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)
+ => out schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(13) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.b.n_comment (TEXT), default.b.n_name (TEXT), default.b.n_nationkey (INT4), default.b.n_regionkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ SCAN(25) on eb_0000000000000_0000_000005
+ => out schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ => in schema: {(4) default.b.n_nationkey (INT4), default.b.n_name (TEXT), default.b.n_regionkey (INT4), default.b.n_comment (TEXT)}
+ SCAN(24) on eb_0000000000000_0000_000004
+ => out schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+ => in schema: {(9) default.a.s_acctbal (FLOAT8), default.a.s_address (TEXT), default.a.s_comment (TEXT), default.a.s_name (TEXT), default.a.s_nationkey (INT4), default.a.s_phone (TEXT), default.a.s_suppkey (INT4), default.l.cnt (INT8), default.l.l_orderkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000007 [TERMINAL]
+=======================================================
diff --git a/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.result b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.result
new file mode 100644
index 0000000..c8ad1df
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestMultipleJoinTypes/testExceedBroadcastThreshold.1.result
@@ -0,0 +1,4 @@
+l_orderkey,cnt,s_suppkey,s_name,s_address,s_nationkey,s_phone,s_acctbal,s_comment,n_nationkey,n_name,n_regionkey,n_comment
+-------------------------------
+2,1,2,Supplier#000000002,89eJ5ksX3ImxJQBvxObC,,5,15-679-861-2259,4032.68, slyly bold instructions. idle dependen,2,BRAZIL,1,y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
+3,2,3,Supplier#000000003,q1,G3Pj6OjIuUYfUoH18BFTKP5aU9bEV3,1,11-383-516-1199,4192.4,blithely silent requests after the express dependencies are sl,3,CANADA,1,eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 148092a..ab68e97 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -49,6 +49,7 @@
public static final String ORDERS = "orders";
public static final String PARTSUPP = "partsupp";
public static final String SUPPLIER = "supplier";
+ public static final String SUPPLIER_COPY = "small_supplier";
public static final String EMPTY_ORDERS = "empty_orders";
@@ -64,6 +65,7 @@
tableVolumes.put(ORDERS, 171952161L);
tableVolumes.put(PARTSUPP, 118984616L);
tableVolumes.put(SUPPLIER, 1409184L);
+ tableVolumes.put(SUPPLIER_COPY, 5120L);
tableVolumes.put(EMPTY_ORDERS, 0L);
}
@@ -169,6 +171,7 @@
.addColumn("s_acctbal", Type.FLOAT8) // 5
.addColumn("s_comment", Type.TEXT); // 6
schemas.put(SUPPLIER, supplier);
+ schemas.put(SUPPLIER_COPY, supplier);
}
public void loadOutSchema() {
@@ -198,6 +201,7 @@
loadTable(ORDERS);
loadTable(PARTSUPP) ;
loadTable(SUPPLIER);
+ loadTable(SUPPLIER_COPY);
loadTable(EMPTY_ORDERS);
}
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 02e6609..4534235 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -39,7 +39,7 @@
// Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId.
private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<ExecutionBlockId, ExecutionBlockId>();
- private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap();
+ private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap(); // map of table name and corresponding scan node
private PlanContext planContext;
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index aff5038..a9bedb6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -21,6 +21,7 @@
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -109,7 +110,7 @@
@Override
public int compare(ScanNode o1, ScanNode o2) {
- long compare = GlobalPlanRewriteUtil.getTableVolume(o1) - GlobalPlanRewriteUtil.getTableVolume(o2);
+ long compare = PlannerUtil.getTableVolume(o1) - PlannerUtil.getTableVolume(o2);
if (compare == 0) {
return 0;
} else if (compare > 0) {
@@ -157,7 +158,7 @@
private final long thresholdForCrossJoin;
private final boolean broadcastForNonCrossJoinEnabled;
private final GlobalPlanRewriteUtil.ParentFinder parentFinder;
- private final Map<ExecutionBlockId, Long> estimatedEbOutputSize = TUtil.newHashMap();
+ private final Map<String, Long> estimatedEbOutputSize = new HashMap<>(); // map of table name and its volume
public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator,
GlobalPlanRewriteUtil.ParentFinder parentFinder,
@@ -192,9 +193,9 @@
if (!current.isPreservedRow()) {
long totalVolume = 0;
for (ScanNode scanNode : current.getScanNodes()) {
- totalVolume += GlobalPlanRewriteUtil.getTableVolume(scanNode);
+ totalVolume += PlannerUtil.getTableVolume(scanNode);
}
- estimatedEbOutputSize.put(current.getId(), totalVolume);
+ estimatedEbOutputSize.put(current.getId().toString(), totalVolume);
}
}
@@ -223,21 +224,51 @@
for (ExecutionBlock child : childs) {
if (!child.isPreservedRow()) {
updateBroadcastableRelForChildEb(child, joinType);
+ // Mark the scan node for the child eb as broadcastable to figure out the current and child ebs can be merged.
updateInputBasedOnChildEb(child, current);
}
}
if (current.hasBroadcastRelation()) {
+ long broadcastThreshold = joinType.equals(JoinType.CROSS) ?
+ thresholdForCrossJoin : thresholdForNonCrossJoin;
+
// The current execution block and its every child are able to be merged.
for (ExecutionBlock child : childs) {
addUnionNodeIfNecessary(unionScanMap, plan, child, current);
- mergeTwoPhaseJoinIfPossible(plan, child, current);
+
+ // First check that two stages can be merged.
+ // If the total volume of broadcast candidates of the merged stage exceeds the threshold,
+ // these stages cannot be merged.
+ //
+ // Note: this is a greedy approach, and there may be a better solution to find more optimized broadcast
+ // join plan. For example, it would be better to split the merged stage by marking the largest broadcast
+ // candidate as not being broadcasted because it can reduce the network cost a little bit.
+ // However, the benefit looks not large (every broadcast candidates are very small), so the simple greedy
+ // solution is used here.
+ if (getTotalVolumeOfBroadcastableRelations(current) +
+ getTotalVolumeOfBroadcastableRelations(child)
+ > broadcastThreshold) {
+ // If a scan node for the child eb is marked as a broadcast candidate, mark it as not being broadcasted
+ // again.
+ List<ScanNode> notBroadcastable = new ArrayList<>();
+ for (ScanNode eachScan : current.getBroadcastRelations()) {
+ if (eachScan.getTableName().equals(child.getId().toString())) {
+ notBroadcastable.add(eachScan);
+ }
+ }
+
+ for (ScanNode eachScan : notBroadcastable) {
+ current.removeBroadcastRelation(eachScan);
+ }
+ } else {
+ mergeTwoPhaseJoinIfPossible(plan, child, current);
+ }
+
}
- checkTotalSizeOfBroadcastableRelations(current);
-
long outputVolume = estimateOutputVolume(current);
- estimatedEbOutputSize.put(current.getId(), outputVolume);
+ estimatedEbOutputSize.put(current.getId().toString(), outputVolume);
}
} else {
List<ScanNode> relations = TUtil.newList(current.getBroadcastRelations());
@@ -265,7 +296,7 @@
private void updateBroadcastableRelForChildEb(ExecutionBlock child, JoinType joinType) {
long threshold = joinType == JoinType.CROSS ? thresholdForCrossJoin : thresholdForNonCrossJoin;
for (ScanNode scanNode : child.getScanNodes()) {
- long volume = GlobalPlanRewriteUtil.getTableVolume(scanNode);
+ long volume = PlannerUtil.getTableVolume(scanNode);
if (volume >= 0 && volume <= threshold) {
// If the child eb is already visited, the below line may update its broadcast relations.
// Furthermore, this operation might mark the preserved-row relation as the broadcast relation with outer join.
@@ -392,7 +423,7 @@
thresholdForCrossJoin : thresholdForNonCrossJoin;
int i;
for (i = 0; i < broadcastCandidates.size(); i++) {
- long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i));
+ long volumeOfCandidate = PlannerUtil.getTableVolume(broadcastCandidates.get(i));
if (totalBroadcastVolume + volumeOfCandidate > largeThreshold) {
break;
}
@@ -405,6 +436,20 @@
}
}
+ private long getTotalVolumeOfBroadcastableRelations(ExecutionBlock block) {
+ long sum = 0;
+
+ for (ScanNode eachScan : block.getBroadcastRelations()) {
+ if (!estimatedEbOutputSize.containsKey(eachScan.getTableName())) {
+ long volume = PlannerUtil.getTableVolume(eachScan);
+ sum += volume == TajoConstants.UNKNOWN_LENGTH ?
+ Integer.MAX_VALUE : volume; // Use Integer.MAX to prevent overflow
+ }
+ }
+
+ return sum;
+ }
+
private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current, ExecutionBlock parent) {
if (parent != null && !plan.isTerminal(parent)) {
ScanNode scanForCurrent = findScanForChildEb(current, parent);
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
index b13cb0f..fd9d022 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
@@ -90,27 +90,6 @@
}
/**
- * Get a volume of a table of a partitioned table
- * @param scanNode ScanNode corresponding to a table
- * @return table volume (bytes)
- */
- public static long getTableVolume(ScanNode scanNode) {
- if (scanNode.getTableDesc().hasStats()) {
- long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
- if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
- PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
- if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
- scanBytes = 0L;
- }
- }
-
- return scanBytes;
- } else {
- return -1;
- }
- }
-
- /**
* It calculates the total volume of all descendent relation nodes.
*/
public static long computeDescendentVolume(LogicalNode node) {
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index cb3f9b8..6979a64 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.algebra.*;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.*;
@@ -977,4 +978,25 @@
}
return false;
}
+
+ /**
+ * Get a volume of a table of a partitioned table
+ * @param scanNode ScanNode corresponding to a table
+ * @return table volume (bytes)
+ */
+ public static long getTableVolume(ScanNode scanNode) {
+ if (scanNode.getTableDesc().hasStats()) {
+ long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
+ if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+ PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
+ if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
+ scanBytes = 0L;
+ }
+ }
+
+ return scanBytes;
+ } else {
+ return TajoConstants.UNKNOWN_LENGTH;
+ }
+ }
}
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
index 7442561..eb4530d 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
@@ -24,6 +24,7 @@
import org.apache.tajo.exception.TooLargeInputForCrossJoinException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context;
import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
import org.apache.tajo.unit.StorageUnit;
@@ -90,7 +91,7 @@
List<String> largeRelationNames = TUtil.newList();
if (isSimpleRelationNode(node.getLeftChild())) {
- if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) {
+ if (PlannerUtil.getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) {
crossJoinAllowed = true;
} else {
largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName());
@@ -98,7 +99,7 @@
}
if (isSimpleRelationNode(node.getRightChild())) {
- if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) {
+ if (PlannerUtil.getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) {
crossJoinAllowed = true;
} else {
largeRelationNames.add(((ScanNode) node.getRightChild()).getCanonicalName());
@@ -125,25 +126,4 @@
return false;
}
}
-
- /**
- * Get a volume of a table of a partitioned table
- * @param scanNode ScanNode corresponding to a table
- * @return table volume (bytes)
- */
- private static long getTableVolume(ScanNode scanNode) {
- if (scanNode.getTableDesc().hasStats()) {
- long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
- if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
- PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
- if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
- scanBytes = 0L;
- }
- }
-
- return scanBytes;
- } else {
- return -1;
- }
- }
}