blob: ebb454b039a61228255cd8872252093494cc5429 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.process.join;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.junit.Test;
import org.mockito.Mockito;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class InnerTimeJoinOperatorTest {
@Test
public void testInnerJoin1() {
// left table
// Time, s1
// 4 4
// 6 6
// 9 9
// ----------- TsBlock-1
// 13 13
// 17 17
// ----------- TsBlock-2
// 22 22
// 25 25
// ----------- TsBlock-3
// right table
// Time, s2
// 1 10
// 2 20
// 3 30
// ----------- TsBlock-1
// 4 40
// 5 50
// 10 100
// ----------- TsBlock-2
// 13 130
// 16 160
// ----------- TsBlock-3
// 26 260
// 27 270
// ----------- TsBlock-4
// result table
// Time, s1, s2
// 4, 4, 40
// 13 13, 130
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, TimeUnit.SECONDS));
Operator leftChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{4L, 6L, 9L},
{13L, 17L},
{22L, 25L}
};
private final int[][] valueArray =
new int[][] {
{4, 6, 9},
{13, 17},
{22, 25}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false},
{false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Collections.singletonList(TSDataType.INT32));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(valueArray[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator rightChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{1L, 2L, 3L},
{4L, 5L, 10L},
{13L, 16L},
{26L, 27L}
};
private final long[][] valueArray =
new long[][] {
{10L, 20L, 30L},
{40L, 50L, 100L},
{130L, 160L},
{260L, 270L}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, false},
{false, false},
{false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Collections.singletonList(TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeLong(valueArray[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 4;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 4;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
outputColumnMap.put(new InputLocation(0, 0), 0);
outputColumnMap.put(new InputLocation(1, 0), 1);
InnerTimeJoinOperator innerTimeJoinOperator =
new InnerTimeJoinOperator(
operatorContext,
Arrays.asList(leftChild, rightChild),
Arrays.asList(TSDataType.INT32, TSDataType.INT64),
new AscTimeComparator(),
outputColumnMap);
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes() + 64 * 1024 * 2,
innerTimeJoinOperator.calculateMaxPeekMemory());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
innerTimeJoinOperator.calculateMaxReturnSize());
assertEquals(64 * 1024, innerTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
long[] timeArray = new long[] {4L, 13L};
int[] column1Array = new int[] {4, 13};
boolean[] column1IsNull = new boolean[] {false, false};
long[] column2Array = new long[] {40L, 130L};
boolean[] column2IsNull = new boolean[] {false, false};
try {
int count = 0;
ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!innerTimeJoinOperator.isFinished() && innerTimeJoinOperator.hasNext()) {
TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
if (!column1IsNull[count]) {
assertEquals(column1Array[count], tsBlock.getColumn(0).getInt(i));
}
assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
if (!column2IsNull[count]) {
assertEquals(column2Array[count], tsBlock.getColumn(1).getLong(i));
}
}
}
listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testInnerJoin2() {
// left table
// Time, s1, s2
// 25 null 26
// 22 22 null
// ---------------------- TsBlock-1
// null
// ---------------------- TsBlock-2
// 19 19 20
// 18 18 null
// 15 null 16
// ---------------------- TsBlock-3
// empty
// ---------------------- TsBlock-4
// 9 null null
// 7 7 null
// 6 null 7
// 3 3 4
// ---------------------- TsBlock-5
// empty
// ---------------------- TsBlock-6
// right table
// Time, s3, s4
// 21 210.0 false
// 20 200.0 null
// ---------------------- TsBlock-1
// empty
// ---------------------- TsBlock-2
// 19 190.0 true
// 18 180.0 null
// 15 null false
// 14 null null
// 8 80.0 true
// 7 null false
// ---------------------- TsBlock-3
// null
// ---------------------- TsBlock-4
// 5 50.0 true
// ---------------------- TsBlock-5
// 4 40.0 null
// ---------------------- TsBlock-6
// 3 30.0 false
// ---------------------- TsBlock-7
// 2 20.0 true
// 1 10.0 false
// ---------------------- TsBlock-8
// empty
// ---------------------- TsBlock-9
// result table
// Time, s1, s2, s3, s4
// 19 19 20 190.0 true
// 18 18 null 180.0 null
// 15 null 16 null false
// 7 7 null null false
// 3 3 4 30.0 false
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1000, TimeUnit.SECONDS));
Operator leftChild =
new Operator() {
private final long[][] timeArray =
new long[][] {{25L, 22L}, null, {19L, 18L, 15L}, {}, {9L, 7L, 6L, 3L}, {}};
private final int[][] value1Array =
new int[][] {{0, 22}, null, {19, 18, 0}, {}, {0, 7, 0, 3}, {}};
private final long[][] value2Array =
new long[][] {{26L, 0L}, null, {20L, 0L, 16L}, {}, {0L, 0L, 7L, 4L}, {}};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{{true, false}, null, {false, false, true}, {}, {true, false, true, false}, {}},
{{false, true}, null, {false, true, false}, {}, {true, true, false, false}, {}}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.INT32, TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 6;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 6;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator rightChild =
new Operator() {
private final long[][] timeArray =
new long[][] {
{21L, 20L}, {}, {19L, 18L, 15L, 14L, 8L, 7L}, null, {5L}, {4L}, {3L}, {2L, 1L}, {}
};
private final float[][] value1Array =
new float[][] {
{210.0f, 200.0f},
{},
{190.0f, 180.0f, 0.0f, 0.0f, 80.0f, 0.0f},
null,
{50.0f},
{40.0f},
{30.0f},
{20.0f, 10.0f},
{}
};
private final boolean[][] value2Array =
new boolean[][] {
{false, false},
{},
{true, false, false, false, true, false},
null,
{true},
{false},
{false},
{true, false},
{}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false},
{},
{false, false, true, true, false, true},
null,
{false},
{false},
{false},
{false, false},
{}
},
{
{false, true},
{},
{false, true, false, true, false, false},
null,
{false},
{true},
{false},
{false, false},
{}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 9;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 9;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
outputColumnMap.put(new InputLocation(0, 0), 0);
outputColumnMap.put(new InputLocation(0, 1), 1);
outputColumnMap.put(new InputLocation(1, 0), 2);
outputColumnMap.put(new InputLocation(1, 1), 3);
InnerTimeJoinOperator innerTimeJoinOperator =
new InnerTimeJoinOperator(
operatorContext,
Arrays.asList(leftChild, rightChild),
Arrays.asList(TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.BOOLEAN),
new DescTimeComparator(),
outputColumnMap);
long[] timeArray = new long[] {19L, 18L, 15L, 7L, 3L};
int[] column1Array = new int[] {19, 18, 0, 7, 3};
boolean[] column1IsNull = new boolean[] {false, false, true, false, false};
long[] column2Array = new long[] {20L, 0L, 16L, 0L, 4L};
boolean[] column2IsNull = new boolean[] {false, true, false, true, false};
float[] column3Array = new float[] {190.0f, 180.0f, 0.0f, 0.0f, 30.0f};
boolean[] column3IsNull = new boolean[] {false, false, true, true, false};
boolean[] column4Array = new boolean[] {true, false, false, false, false};
boolean[] column4IsNull = new boolean[] {false, true, false, false, false};
try {
int count = 0;
ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!innerTimeJoinOperator.isFinished() && innerTimeJoinOperator.hasNext()) {
TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
if (!column1IsNull[count]) {
assertEquals(column1Array[count], tsBlock.getColumn(0).getInt(i));
}
assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
if (!column2IsNull[count]) {
assertEquals(column2Array[count], tsBlock.getColumn(1).getLong(i));
}
assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
if (!column3IsNull[count]) {
assertEquals(column3Array[count], tsBlock.getColumn(2).getFloat(i), 0.000001);
}
assertEquals(column4IsNull[count], tsBlock.getColumn(3).isNull(i));
if (!column4IsNull[count]) {
assertEquals(column4Array[count], tsBlock.getColumn(3).getBoolean(i));
}
}
}
listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testInnerJoin3() {
// child-1
// Time, s1, s2
// 100 100 200
// 90 90 180
// 80 80 160
// 70 70 140
// 60 60 120
// 50 50 100
// 40 40 80
// 30 30 60
// 20 20 40
// 10 10 20
// 0 0 0
// ---------------------- TsBlock-1
// child-2
// Time, s3, s4
// 1000 3000.0 false
// 500 500.0 true
// 100 300.0 null
// ------------------------- TsBlock-1
// 99 99.0 true
// 95 95.0 null
// 90 null false
// ------------------------- TsBlock-2
// 50 150.0 true
// 48 48.0 true
// 20 60.0 null
// 10 null false
// ------------------------- TsBlock-3
// result table
// Time, s1, s2, s3, s4
// 100 100 200 300.0 null
// 90 90 180 null false
// 50 50 100 150.0 true
// 20 20 40 60.0 null
// 10 10 20 null false
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1000, TimeUnit.SECONDS));
Operator child1 =
new Operator() {
private final long[][] timeArray =
new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L, 10L, 0L}};
private final int[][] value1Array =
new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
private final long[][] value2Array =
new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L, 20L, 0L}};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{{false, false, false, false, false, false, false, false, false, false, false}},
{{false, false, false, false, false, false, false, false, false, false, false}}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.INT32, TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 1;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 1;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator child2 =
new Operator() {
private final long[][] timeArray =
new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L, 20L, 10L}};
private final float[][] value1Array =
new float[][] {
{3000.0f, 500.0f, 300.0f},
{99.0f, 95.0f, 0.0f},
{150.0f, 48.0f, 60.0f, 0.0f}
};
private final boolean[][] value2Array =
new boolean[][] {
{false, true, false},
{true, false, false},
{true, true, false, false}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, true},
{false, false, false, true}
},
{
{false, false, true},
{false, true, false},
{false, false, true, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
outputColumnMap.put(new InputLocation(0, 0), 0);
outputColumnMap.put(new InputLocation(0, 1), 1);
outputColumnMap.put(new InputLocation(1, 0), 2);
outputColumnMap.put(new InputLocation(1, 1), 3);
InnerTimeJoinOperator innerTimeJoinOperator =
new InnerTimeJoinOperator(
operatorContext,
Arrays.asList(child1, child2),
Arrays.asList(TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.BOOLEAN),
new DescTimeComparator(),
outputColumnMap);
long[] timeArray = new long[] {100L, 90L, 50L, 20L, 10L};
int[] column1Array = new int[] {100, 90, 50, 20, 10};
boolean[] column1IsNull = new boolean[] {false, false, false, false, false};
long[] column2Array = new long[] {200L, 180L, 100L, 40L, 20L};
boolean[] column2IsNull = new boolean[] {false, false, false, false, false};
float[] column3Array = new float[] {300.0f, 0.0f, 150.0f, 60.0f, 0.0f};
boolean[] column3IsNull = new boolean[] {false, true, false, false, true};
boolean[] column4Array = new boolean[] {false, false, true, false, false};
boolean[] column4IsNull = new boolean[] {true, false, false, true, false};
try {
int count = 0;
ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!innerTimeJoinOperator.isFinished() && innerTimeJoinOperator.hasNext()) {
TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
if (!column1IsNull[count]) {
assertEquals(column1Array[count], tsBlock.getColumn(0).getInt(i));
}
assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
if (!column2IsNull[count]) {
assertEquals(column2Array[count], tsBlock.getColumn(1).getLong(i));
}
assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
if (!column3IsNull[count]) {
assertEquals(column3Array[count], tsBlock.getColumn(2).getFloat(i), 0.000001);
}
assertEquals(column4IsNull[count], tsBlock.getColumn(3).isNull(i));
if (!column4IsNull[count]) {
assertEquals(column4Array[count], tsBlock.getColumn(3).getBoolean(i));
}
}
}
listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testInnerJoin4() {
// child-1
// Time, s1, s2
// 100 100 200
// 90 90 180
// 80 80 160
// 70 70 140
// 60 60 120
// 50 50 100
// 40 40 80
// 30 30 60
// 20 20 40
// 10 10 20
// 0 0 0
// ---------------------- TsBlock-1
// child-2
// Time, s3, s4
// 1000 3000.0 false
// 500 500.0 true
// 100 300.0 null
// ------------------------- TsBlock-1
// 99 99.0 true
// 95 95.0 null
// 90 null false
// ------------------------- TsBlock-2
// 50 150.0 true
// 48 48.0 true
// 20 60.0 null
// 10 null false
// ------------------------- TsBlock-3
// child-3
// Time, s5,
// 1000 "iotdb"
// 500 "ty"
// 101 "zm"
// --------------------- TsBlock-1
// 99 "ty"
// 95 "love"
// 80 "zm"
// 60 "2018-05-06"
// --------------------- TsBlock-2
// 40 "1997-09-09"
// 22 "1995-04-21"
// 11 "2022-04-21"
// 0 "2023-12-30"
// --------------------- TsBlock-3
// result table
// Time, s1, s2, s3, s4, s5
// empty
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1000, TimeUnit.SECONDS));
Operator child1 =
new Operator() {
private final long[][] timeArray =
new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L, 10L, 0L}};
private final int[][] value1Array =
new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
private final long[][] value2Array =
new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L, 20L, 0L}};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{{false, false, false, false, false, false, false, false, false, false, false}},
{{false, false, false, false, false, false, false, false, false, false, false}}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.INT32, TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 1;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 1;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator child2 =
new Operator() {
private final long[][] timeArray =
new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L, 20L, 10L}};
private final float[][] value1Array =
new float[][] {
{3000.0f, 500.0f, 300.0f},
{99.0f, 95.0f, 0.0f},
{150.0f, 48.0f, 60.0f, 0.0f}
};
private final boolean[][] value2Array =
new boolean[][] {
{false, true, false},
{true, false, false},
{true, true, false, false}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, true},
{false, false, false, true}
},
{
{false, false, true},
{false, true, false},
{false, false, true, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator child3 =
new Operator() {
private final long[][] timeArray =
new long[][] {{1000L, 500L, 101L}, {99L, 95L, 80L, 60L}, {40L, 22L, 11L, 0L}};
private final Binary[][] value1Array =
new Binary[][] {
{
new Binary("iotdb".getBytes(StandardCharsets.UTF_8)),
new Binary("ty".getBytes(StandardCharsets.UTF_8)),
new Binary("zm".getBytes(StandardCharsets.UTF_8))
},
{
new Binary("ty".getBytes(StandardCharsets.UTF_8)),
new Binary("love".getBytes(StandardCharsets.UTF_8)),
new Binary("zm".getBytes(StandardCharsets.UTF_8)),
new Binary("2018-05-06".getBytes(StandardCharsets.UTF_8))
},
{
new Binary("1997-09-09".getBytes(StandardCharsets.UTF_8)),
new Binary("1995-04-21".getBytes(StandardCharsets.UTF_8)),
new Binary("2022-04-21".getBytes(StandardCharsets.UTF_8)),
new Binary("2023-12-30".getBytes(StandardCharsets.UTF_8))
}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, false, false},
{false, false, false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(timeArray[index].length, Arrays.asList(TSDataType.TEXT));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeBinary(value1Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
outputColumnMap.put(new InputLocation(0, 0), 0);
outputColumnMap.put(new InputLocation(0, 1), 1);
outputColumnMap.put(new InputLocation(1, 0), 2);
outputColumnMap.put(new InputLocation(1, 1), 3);
outputColumnMap.put(new InputLocation(2, 0), 4);
InnerTimeJoinOperator innerTimeJoinOperator =
new InnerTimeJoinOperator(
operatorContext,
Arrays.asList(child1, child2, child3),
Arrays.asList(
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.BOOLEAN,
TSDataType.TEXT),
new DescTimeComparator(),
outputColumnMap);
try {
int count = 0;
ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!innerTimeJoinOperator.isFinished() && innerTimeJoinOperator.hasNext()) {
TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
count += tsBlock.getPositionCount();
}
listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(0, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testInnerJoin5() {
// child-1
// Time, s1, s2
// 100 100 200
// 90 90 180
// 80 80 160
// 70 70 140
// 60 60 120
// 50 50 100
// 40 40 80
// 30 30 60
// 20 20 40
// 10 10 20
// 0 0 0
// ---------------------- TsBlock-1
// child-2
// Time, s3, s4
// 1000 3000.0 false
// 500 500.0 true
// 100 300.0 null
// ------------------------- TsBlock-1
// 99 99.0 true
// 95 95.0 null
// 90 null false
// ------------------------- TsBlock-2
// 50 150.0 true
// 48 48.0 true
// 20 60.0 null
// 10 null false
// ------------------------- TsBlock-3
// child-3
// Time, s5,
// 1000 "iotdb"
// 500 "ty"
// 101 "zm"
// --------------------- TsBlock-1
// 99 "ty"
// 90 "love"
// 80 "zm"
// 60 "2018-05-06"
// --------------------- TsBlock-2
// 40 "1997-09-09"
// 22 "1995-04-21"
// 11 "2022-04-21"
// 0 "2023-12-30"
// --------------------- TsBlock-3
// result table
// Time, s1, s2, s3, s4, s5
// 90 90 180 null false "love"
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1000, TimeUnit.SECONDS));
Operator child1 =
new Operator() {
private final long[][] timeArray =
new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L, 10L, 0L}};
private final int[][] value1Array =
new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
private final long[][] value2Array =
new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L, 20L, 0L}};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{{false, false, false, false, false, false, false, false, false, false, false}},
{{false, false, false, false, false, false, false, false, false, false, false}}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.INT32, TSDataType.INT64));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 1;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 1;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator child2 =
new Operator() {
private final long[][] timeArray =
new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L, 20L, 10L}};
private final float[][] value1Array =
new float[][] {
{3000.0f, 500.0f, 300.0f},
{99.0f, 95.0f, 0.0f},
{150.0f, 48.0f, 60.0f, 0.0f}
};
private final boolean[][] value2Array =
new boolean[][] {
{false, true, false},
{true, false, false},
{true, true, false, false}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, true},
{false, false, false, true}
},
{
{false, false, true},
{false, true, false},
{false, false, true, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(
timeArray[index].length, Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
}
if (valueIsNull[1][index][i]) {
builder.getColumnBuilder(1).appendNull();
} else {
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Operator child3 =
new Operator() {
private final long[][] timeArray =
new long[][] {{1000L, 500L, 101L}, {99L, 90L, 80L, 60L}, {40L, 22L, 11L, 0L}};
private final Binary[][] value1Array =
new Binary[][] {
{
new Binary("iotdb".getBytes(StandardCharsets.UTF_8)),
new Binary("ty".getBytes(StandardCharsets.UTF_8)),
new Binary("zm".getBytes(StandardCharsets.UTF_8))
},
{
new Binary("ty".getBytes(StandardCharsets.UTF_8)),
new Binary("love".getBytes(StandardCharsets.UTF_8)),
new Binary("zm".getBytes(StandardCharsets.UTF_8)),
new Binary("2018-05-06".getBytes(StandardCharsets.UTF_8))
},
{
new Binary("1997-09-09".getBytes(StandardCharsets.UTF_8)),
new Binary("1995-04-21".getBytes(StandardCharsets.UTF_8)),
new Binary("2022-04-21".getBytes(StandardCharsets.UTF_8)),
new Binary("2023-12-30".getBytes(StandardCharsets.UTF_8))
}
};
private final boolean[][][] valueIsNull =
new boolean[][][] {
{
{false, false, false},
{false, false, false, false},
{false, false, false, false}
}
};
private int index = 0;
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public TsBlock next() {
if (timeArray[index] == null) {
index++;
return null;
}
TsBlockBuilder builder =
new TsBlockBuilder(timeArray[index].length, Arrays.asList(TSDataType.TEXT));
for (int i = 0, size = timeArray[index].length; i < size; i++) {
builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
if (valueIsNull[0][index][i]) {
builder.getColumnBuilder(0).appendNull();
} else {
builder.getColumnBuilder(0).writeBinary(value1Array[index][i]);
}
}
builder.declarePositions(timeArray[index].length);
index++;
return builder.build();
}
@Override
public boolean hasNext() {
return index < 3;
}
@Override
public void close() {}
@Override
public boolean isFinished() {
return index >= 3;
}
@Override
public long calculateMaxPeekMemory() {
return 64 * 1024;
}
@Override
public long calculateMaxReturnSize() {
return 64 * 1024;
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
return 0;
}
};
Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
outputColumnMap.put(new InputLocation(0, 0), 0);
outputColumnMap.put(new InputLocation(0, 1), 1);
outputColumnMap.put(new InputLocation(1, 0), 2);
outputColumnMap.put(new InputLocation(1, 1), 3);
outputColumnMap.put(new InputLocation(2, 0), 4);
InnerTimeJoinOperator innerTimeJoinOperator =
new InnerTimeJoinOperator(
operatorContext,
Arrays.asList(child1, child2, child3),
Arrays.asList(
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.BOOLEAN,
TSDataType.TEXT),
new DescTimeComparator(),
outputColumnMap);
try {
int count = 0;
ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
while (!innerTimeJoinOperator.isFinished() && innerTimeJoinOperator.hasNext()) {
TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
count += tsBlock.getPositionCount();
assertEquals(90, tsBlock.getTimeByIndex(0));
assertFalse(tsBlock.getColumn(0).isNull(0));
assertEquals(90, tsBlock.getColumn(0).getInt(0));
assertFalse(tsBlock.getColumn(1).isNull(0));
assertEquals(180L, tsBlock.getColumn(1).getLong(0));
assertTrue(tsBlock.getColumn(2).isNull(0));
assertFalse(tsBlock.getColumn(3).isNull(0));
assertFalse(tsBlock.getColumn(3).getBoolean(0));
assertFalse(tsBlock.getColumn(4).isNull(0));
assertEquals(
"love", tsBlock.getColumn(4).getBinary(0).getStringValue(StandardCharsets.UTF_8));
}
listenableFuture = innerTimeJoinOperator.isBlocked();
listenableFuture.get();
}
assertEquals(1, count);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}