blob: 27cbf5f731fec88f5d5be40a7ac267d1e8a105a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.process.join;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class LeftOuterTimeJoinOperatorTest {
@Test
public void testLeftOuterJoin1() {
// left table
// Time, s1
// 4 4
// 6 6
// 9 9
// ----------- TsBlock-1
// 13 13
// 17 17
// ----------- TsBlock-2
// 22 22
// 25 25
// ----------- TsBlock-3
// right table
// Time, s2
// 1 10
// 2 20
// 3 30
// ----------- TsBlock-1
// 4 40
// 5 50
// 10 100
// ----------- TsBlock-2
// 13 130
// 16 160
// ----------- TsBlock-3
// 26 260
// 27 270
// ----------- TsBlock-4
// result table
// Time, s1, s2
// 4 4 40
// 6 6 null
// 9 9 null
// 13 13 130
// 17 17 null
// 22 22 null
// 25 25 null
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, TimeUnit.SECONDS));
Operator leftChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{4L, 6L, 9L},
{13L, 17L},
{22L, 25L}
};
private final int[][] valueArray =
new int[][] {
{4, 6, 9},
{13, 17},
{22, 25}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false},
{false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Collections.singletonList(TSDataType.INT32));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(valueArray[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator rightChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{1L, 2L, 3L},
{4L, 5L, 10L},
{13L, 16L},
{26L, 27L}
};
private final long[][] valueArray =
new long[][] {
{10L, 20L, 30L},
{40L, 50L, 100L},
{130L, 160L},
{260L, 270L}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, false},
{false, false},
{false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Collections.singletonList(TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeLong(valueArray[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 4;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 4;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
new LeftOuterTimeJoinOperator(
operatorContext,
leftChild,
1,
rightChild,
Arrays.asList(TSDataType.INT32, TSDataType.INT64),
new AscTimeComparator());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes() + 64 * 1024 * 2,
leftOuterTimeJoinOperator.calculateMaxPeekMemory());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
leftOuterTimeJoinOperator.calculateMaxReturnSize());
assertEquals(64 * 1024 * 2, leftOuterTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
long[] timeArray = new long[] {4L, 6L, 9L, 13L, 17L, 22L, 25L};
int[] column1Array = new int[] {4, 6, 9, 13, 17, 22, 25};
boolean[] column1IsNull = new boolean[] {false, false, false, false, false, false, false};
long[] column2Array = new long[] {40L, 0L, 0L, 130L, 0L, 0L, 0L};
boolean[] column2IsNull = new boolean[] {false, true, true, false, true, true, true};
try {
int count = 0;
ListenableFuture<?> listenableFuture = leftOuterTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!leftOuterTimeJoinOperator.isFinished() && leftOuterTimeJoinOperator.hasNext()) {
TsBlock tsBlock = leftOuterTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
if (!column1IsNull[count]) {
assertEquals(column1Array[count], tsBlock.getColumn(0).getInt(i));
}
assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
if (!column2IsNull[count]) {
assertEquals(column2Array[count], tsBlock.getColumn(1).getLong(i));
}
}
}
listenableFuture = leftOuterTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testLeftOuterJoin2() {
// left table
// Time, s1, s2
// 25 null 26
// 22 22 null
// ---------------------- TsBlock-1
// null
// ---------------------- TsBlock-2
// 19 19 20
// 18 18 null
// 15 null 16
// ---------------------- TsBlock-3
// empty
// ---------------------- TsBlock-4
// 9 null null
// 7 7 null
// 6 null 7
// 3 3 4
// ---------------------- TsBlock-5
// empty
// ---------------------- TsBlock-6
// right table
// Time, s3, s4
// 21 210.0 false
// 20 200.0 null
// ---------------------- TsBlock-1
// empty
// ---------------------- TsBlock-2
// 19 190.0 true
// 18 180.0 null
// 15 null false
// 14 null null
// 8 80.0 true
// 7 null false
// ---------------------- TsBlock-3
// null
// ---------------------- TsBlock-4
// 5 50.0 true
// ---------------------- TsBlock-5
// 4 40.0 null
// ---------------------- TsBlock-6
// 3 30.0 false
// ---------------------- TsBlock-7
// 2 20.0 true
// 1 10.0 false
// ---------------------- TsBlock-8
// empty
// ---------------------- TsBlock-9
// result table
// Time, s1, s2, s3, s4
// 25 null 26 null null
// 22 22 null null null
// 19 19 20 190.0 true
// 18 18 null 180.0 null
// 15 null 16 null false
// 9 null null null null
// 7 7 null null false
// 6 null 7 null null
// 3 3 4 30.0 false
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, TimeUnit.SECONDS));
Operator leftChild =
new Operator() {
private final long[][] timeArray =
new long[][] {{25L, 22L}, null, {19L, 18L, 15L}, {}, {9L, 7L, 6L, 3L}, {}};
private final int[][] value1Array =
new int[][] {{0, 22}, null, {19, 18, 0}, {}, {0, 7, 0, 3}, {}};
private final long[][] value2Array =
new long[][] {{26L, 0L}, null, {20L, 0L, 16L}, {}, {0L, 0L, 7L, 4L}, {}};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{{true, false}, null, {false, false, true}, {}, {true, false, true, false}, {}},
{{false, true}, null, {false, true, false}, {}, {true, true, false, false}, {}}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.INT32, TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 6;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 6;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator rightChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{21L, 20L}, {}, {19L, 18L, 15L, 14L, 8L, 7L}, null, {5L}, {4L}, {3L}, {2L, 1L}, {}
};
private final float[][] value1Array =
new float[][] {
{210.0f, 200.0f},
{},
{190.0f, 180.0f, 0.0f, 0.0f, 80.0f, 0.0f},
null,
{50.0f},
{40.0f},
{30.0f},
{20.0f, 10.0f},
{}
};
private final boolean[][] value2Array =
new boolean[][] {
{false, false},
{},
{true, false, false, false, true, false},
null,
{true},
{false},
{false},
{true, false},
{}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false},
{},
{false, false, true, true, false, true},
null,
{false},
{false},
{false},
{false, false},
{}
},
{
{false, true},
{},
{false, true, false, true, false, false},
null,
{false},
{true},
{false},
{false, false},
{}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 9;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 9;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
new LeftOuterTimeJoinOperator(
operatorContext,
leftChild,
2,
rightChild,
Arrays.asList(TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.BOOLEAN),
new DescTimeComparator());
long[] timeArray = new long[] {25L, 22L, 19L, 18L, 15L, 9L, 7L, 6L, 3L};
int[] column1Array = new int[] {0, 22, 19, 18, 0, 0, 7, 0, 3};
boolean[] column1IsNull =
new boolean[] {true, false, false, false, true, true, false, true, false};
long[] column2Array = new long[] {26L, 0L, 20L, 0L, 16L, 0L, 0L, 7L, 4L};
boolean[] column2IsNull =
new boolean[] {false, true, false, true, false, true, true, false, false};
float[] column3Array = new float[] {0.0f, 0.0f, 190.0f, 180.0f, 0.0f, 0.0f, 0.0f, 0.0f, 30.0f};
boolean[] column3IsNull =
new boolean[] {true, true, false, false, true, true, true, true, false};
boolean[] column4Array =
new boolean[] {false, false, true, false, false, false, false, false, false};
boolean[] column4IsNull =
new boolean[] {true, true, false, true, false, true, false, true, false};
try {
int count = 0;
ListenableFuture<?> listenableFuture = leftOuterTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!leftOuterTimeJoinOperator.isFinished() && leftOuterTimeJoinOperator.hasNext()) {
TsBlock tsBlock = leftOuterTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
if (!column1IsNull[count]) {
assertEquals(column1Array[count], tsBlock.getColumn(0).getInt(i));
}
assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
if (!column2IsNull[count]) {
assertEquals(column2Array[count], tsBlock.getColumn(1).getLong(i));
}
assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
if (!column3IsNull[count]) {
assertEquals(column3Array[count], tsBlock.getColumn(2).getFloat(i), 0.000001);
}
assertEquals(column4IsNull[count], tsBlock.getColumn(3).isNull(i));
if (!column4IsNull[count]) {
assertEquals(column4Array[count], tsBlock.getColumn(3).getBoolean(i));
}
}
}
listenableFuture = leftOuterTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testLeftOuterJoin3() {
// left table
// Time, s1
// 4 4
// 6 6
// 9 9
// ----------- TsBlock-1
// 13 13
// 17 17
// ----------- TsBlock-2
// 22 22
// 25 25
// ----------- TsBlock-3
// 100 100
// 101 101
// ----------- TsBlock-4
// 110 110
// 111 111
// ----------- TsBlock-5
// right table
// Time, s2
// 1 10
// 2 20
// 3 30
// ----------- TsBlock-1
// 4 40
// 5 50
// 10 100
// ----------- TsBlock-2
// 13 130
// 16 160
// ----------- TsBlock-3
// 26 260
// 27 270
// ----------- TsBlock-4
// result table
// Time, s1, s2
// 4 4 40
// 6 6 null
// 9 9 null
// 13 13 130
// 17 17 null
// 22 22 null
// 25 25 null
// 100 100 null
// 101 101 null
// 110 110 null
// 111 111 null
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, TimeUnit.SECONDS));
Operator leftChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{4L, 6L, 9L},
{13L, 17L},
{22L, 25L},
{100L, 101L},
{110L, 111L}
};
private final int[][] valueArray =
new int[][] {
{4, 6, 9},
{13, 17},
{22, 25},
{100, 101},
{110, 111}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false},
{false, false},
{false, false},
{false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Collections.singletonList(TSDataType.INT32));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(valueArray[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 5;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 5;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator rightChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{1L, 2L, 3L},
{4L, 5L, 10L},
{13L, 16L},
{26L, 27L}
};
private final long[][] valueArray =
new long[][] {
{10L, 20L, 30L},
{40L, 50L, 100L},
{130L, 160L},
{260L, 270L}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, false},
{false, false},
{false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Collections.singletonList(TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeLong(valueArray[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 4;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 4;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
new LeftOuterTimeJoinOperator(
operatorContext,
leftChild,
1,
rightChild,
Arrays.asList(TSDataType.INT32, TSDataType.INT64),
new AscTimeComparator());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes() + 64 * 1024 * 2,
leftOuterTimeJoinOperator.calculateMaxPeekMemory());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
leftOuterTimeJoinOperator.calculateMaxReturnSize());
assertEquals(64 * 1024 * 2, leftOuterTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
long[] timeArray = new long[] {4L, 6L, 9L, 13L, 17L, 22L, 25L, 100L, 101L, 110L, 111L};
int[] column1Array = new int[] {4, 6, 9, 13, 17, 22, 25, 100, 101, 110, 111};
boolean[] column1IsNull =
new boolean[] {false, false, false, false, false, false, false, false, false, false, false};
long[] column2Array = new long[] {40L, 0L, 0L, 130L, 0L, 0L, 0L, 0L, 0L, 0L, 0L};
boolean[] column2IsNull =
new boolean[] {false, true, true, false, true, true, true, true, true, true, true};
try {
int count = 0;
ListenableFuture<?> listenableFuture = leftOuterTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!leftOuterTimeJoinOperator.isFinished() && leftOuterTimeJoinOperator.hasNext()) {
TsBlock tsBlock = leftOuterTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
if (!column1IsNull[count]) {
assertEquals(column1Array[count], tsBlock.getColumn(0).getInt(i));
}
assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
if (!column2IsNull[count]) {
assertEquals(column2Array[count], tsBlock.getColumn(1).getLong(i));
}
}
}
listenableFuture = leftOuterTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}