blob: 51366284d5e2332ddaa242408c7364006d0fd12c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.test.TestBuilder;
import org.apache.drill.test.SubDirTestWatcher;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
@Category(OperatorTest.class)
public class TestMergeJoinWithSchemaChanges extends BaseTestQuery {
public static final Path LEFT_DIR = Paths.get("mergejoin-schemachanges-left");
public static final Path RIGHT_DIR = Paths.get("mergejoin-schemachanges-right");
private static File leftDir;
private static File rightDir;
@BeforeClass
public static void getDirs() {
leftDir = dirTestWatcher.getRootDir()
.toPath()
.resolve(LEFT_DIR)
.toFile();
rightDir = dirTestWatcher.getRootDir()
.toPath()
.resolve(RIGHT_DIR)
.toFile();
}
@Rule
public final SubDirTestWatcher subDirTestWatcher = new SubDirTestWatcher.Builder(dirTestWatcher.getRootDir())
.addSubDir(LEFT_DIR)
.addSubDir(RIGHT_DIR)
.build();
@Test
public void testNumericTypes() throws Exception {
// First create data for numeric types.
// left side int and float vs right side float
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(leftDir, "l1.json")));
for (int i = 0; i < 5000; ++i) {
writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(leftDir, "l2.json")));
for (int i = 1000; i < 6000; ++i) {
writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, (float) i));
}
writer.close();
// right side is int and float
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r1.json")));
for (int i = 2000; i < 7000; ++i) {
writer.write(String.format("{ \"kr\" : %d , \"vr\": %d }\n", i, i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r2.json")));
for (int i = 3000; i < 8000; ++i) {
writer.write(String.format("{ \"kr\" : %f, \"vr\": %f }\n", (float) i, (float) i));
}
writer.close();
// INNER JOIN
String query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "inner", RIGHT_DIR);
TestBuilder builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr");
for (long i = 2000; i < 3000; ++i) {
builder.baselineValues(i, i, i, i);
builder.baselineValues((double)i, (double)i, i, i);
}
for (long i = 3000; i < 5000; ++i) {
builder.baselineValues(i, i, i, i);
builder.baselineValues(i, i, (double)i, (double)i);
builder.baselineValues((double)i, (double)i, i, i);
builder.baselineValues((double)i, (double)i, (double)i, (double)i);
}
for (long i = 5000; i < 6000; ++i) {
builder.baselineValues((double)i, (double)i, i, i);
builder.baselineValues((double) i, (double) i, (double) i, (double) i);
}
builder.go();
// LEFT JOIN
query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "left", RIGHT_DIR);
builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr");
for (long i = 0; i < 2000; ++i) {
builder.baselineValues(i, i, null, null);
}
for (long i = 1000; i < 2000; ++i) {
builder.baselineValues((double)i, (double)i, null, null);
}
for (long i = 2000; i < 3000; ++i) {
builder.baselineValues(i, i, i, i);
builder.baselineValues((double)i, (double)i, i, i);
}
for (long i = 3000; i < 5000; ++i) {
builder.baselineValues(i, i, i, i);
builder.baselineValues(i, i, (double)i, (double)i);
builder.baselineValues((double)i, (double)i, i, i);
builder.baselineValues((double)i, (double)i, (double)i, (double)i);
}
for (long i = 5000; i < 6000; ++i) {
builder.baselineValues((double) i, (double)i, i, i);
builder.baselineValues((double)i, (double)i, (double)i, (double)i);
}
builder.go();
}
@Test
public void testNumericStringTypes() throws Exception {
// left side int and strings
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(leftDir, "l1.json")));
for (int i = 0; i < 5000; ++i) {
writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(leftDir, "l2.json")));
for (int i = 1000; i < 6000; ++i) {
writer.write(String.format("{ \"kl\" : \"%s\" , \"vl\": \"%s\" }\n", i, i));
}
writer.close();
// right side is float and strings
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r1.json")));
for (int i = 2000; i < 7000; ++i) {
writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, (float)i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r2.json")));
for (int i = 3000; i < 8000; ++i) {
writer.write(String.format("{ \"kr\" : \"%s\", \"vr\": \"%s\" }\n", i, i));
}
writer.close();
// INNER JOIN
String query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "inner", RIGHT_DIR);
TestBuilder builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr");
for (long i = 2000; i < 5000; ++i) {
builder.baselineValues(i, i, (double)i, (double)i);
}
for (long i = 3000; i < 6000; ++i) {
final String d = Long.toString(i);
builder.baselineValues(d, d, d, d);
}
builder.go();
// RIGHT JOIN
query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "right", RIGHT_DIR);
builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr");
for (long i = 2000; i < 5000; ++i) {
builder.baselineValues(i, i, (double)i, (double)i);
}
for (long i = 3000; i < 6000; ++i) {
final String d = Long.toString(i);
builder.baselineValues(d, d, d, d);
}
for (long i = 5000; i < 7000; ++i) {
builder.baselineValues(null, null, (double)i, (double)i);
}
for (long i = 6000; i < 8000; ++i) {
final String d = Long.toString(i);
builder.baselineValues(null, null, d, d);
}
builder.go();
}
@Ignore("DRILL-5612")
@Test
public void testMissingAndNewColumns() throws Exception {
// missing column kl
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(leftDir, "l1.json")));
for (int i = 0; i < 50; ++i) {
writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(leftDir, "l2.json")));
for (int i = 50; i < 100; ++i) {
writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(leftDir, "l3.json")));
for (int i = 100; i < 150; ++i) {
writer.write(String.format("{ \"kl2\" : %d , \"vl2\": %d }\n", i, i));
}
writer.close();
// right missing column kr
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r1.json")));
for (int i = 0; i < 50; ++i) {
writer.write(String.format("{ \"kr1\" : %f , \"vr1\": %f }\n", (float)i, (float)i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r2.json")));
for (int i = 50; i < 100; ++i) {
writer.write(String.format("{ \"kr\" : %f , \"vr\": %f }\n", (float)i, (float)i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r3.json")));
for (int i = 100; i < 150; ++i) {
writer.write(String.format("{ \"kr2\" : %f , \"vr2\": %f }\n", (float)i, (float)i));
}
writer.close();
// INNER JOIN
String query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "inner", RIGHT_DIR);
TestBuilder builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
for (long i = 50; i < 100; ++i) {
builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
}
builder.go();
// LEFT JOIN
query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "left", RIGHT_DIR);
builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
for (long i = 0; i < 50; ++i) {
builder.baselineValues(null, null, null, null, i, i, null, null, null, null, null, null);
}
for (long i = 50; i < 100; ++i) {
builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
}
for (long i = 100; i < 150; ++i) {
builder.baselineValues(null, null, null, null, null, null, i, i, null, null, null, null);
}
builder.go();
// RIGHT JOIN
query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kr",
LEFT_DIR, "right", RIGHT_DIR);
builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kr", "vr", "kl1", "vl1", "kl2", "vl2", "kr1", "vr1", "kr2", "vr2");
for (long i = 0; i < 50; ++i) {
builder.baselineValues(null, null, null, null, null, null, null, null, (double)i, (double)i, null, null);
}
for (long i = 50; i < 100; ++i) {
builder.baselineValues(i, i, (double)i, (double)i, null, null, null, null, null, null, null, null);
}
for (long i = 100; i < 150; ++i) {
builder.baselineValues(null, null, null, null, null, null, null, null, null, null, (double)i, (double)i);
}
builder.go();
}
@Test
public void testOneSideSchemaChanges() throws Exception {
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(leftDir, "l1.json")));
for (int i = 0; i < 50; ++i) {
writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
}
for (int i = 50; i < 100; ++i) {
writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float) i, (float) i));
}
writer.close();
writer = new BufferedWriter(new FileWriter(new File(rightDir, "r1.json")));
for (int i = 0; i < 50; ++i) {
writer.write(String.format("{ \"kl\" : %d , \"vl\": %d }\n", i, i));
}
writer.close();
String query = String.format("select * from dfs.`%s` L %s join dfs.`%s` R on L.kl=R.kl",
LEFT_DIR, "inner", RIGHT_DIR);
TestBuilder builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false; alter session set `exec.enable_union_type` = true")
.unOrdered()
.baselineColumns("kl", "vl", "kl0", "vl0");
for (long i = 0; i < 50; ++i) {
builder.baselineValues(i, i, i, i);
}
builder.go();
}
}