| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.queryengine.execution.operator; |
| |
| import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.path.AlignedPath; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; |
| import org.apache.iotdb.db.queryengine.common.PlanFragmentId; |
| import org.apache.iotdb.db.queryengine.common.QueryId; |
| import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; |
| import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; |
| import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; |
| import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; |
| import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; |
| import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; |
| import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.tsfile.exception.write.WriteProcessException; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlock; |
| import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn; |
| import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn; |
| import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn; |
| import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn; |
| import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; |
| import org.apache.iotdb.tsfile.read.common.block.column.LongColumn; |
| import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import io.airlift.units.Duration; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; |
| import static org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.getDefaultSeriesScanOptions; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| public class AlignedSeriesScanOperatorTest { |
| |
| private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.AlignedSeriesScanOperatorTest"; |
| private static final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); |
| |
| private static final List<TsFileResource> seqResources = new ArrayList<>(); |
| private static final List<TsFileResource> unSeqResources = new ArrayList<>(); |
| |
| private static final double DELTA = 0.000001; |
| |
| @BeforeClass |
| public static void setUp() throws MetadataException, IOException, WriteProcessException { |
| AlignedSeriesTestUtil.setUp( |
| measurementSchemas, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws IOException { |
| AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources); |
| } |
| |
| @Test |
| public void batchTest1() throws Exception { |
| ExecutorService instanceNotificationExecutor = |
| IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); |
| try { |
| AlignedPath alignedPath = |
| new AlignedPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device0", |
| measurementSchemas.stream() |
| .map(MeasurementSchema::getMeasurementId) |
| .collect(Collectors.toList()), |
| measurementSchemas.stream() |
| .map(m -> (IMeasurementSchema) m) |
| .collect(Collectors.toList())); |
| QueryId queryId = new QueryId("stub_query"); |
| FragmentInstanceId instanceId = |
| new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); |
| FragmentInstanceStateMachine stateMachine = |
| new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); |
| FragmentInstanceContext fragmentInstanceContext = |
| createFragmentInstanceContext(instanceId, stateMachine); |
| DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); |
| PlanNodeId planNodeId = new PlanNodeId("1"); |
| driverContext.addOperatorContext( |
| 1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName()); |
| |
| AlignedSeriesScanOperator seriesScanOperator = |
| new AlignedSeriesScanOperator( |
| driverContext.getOperatorContexts().get(0), |
| planNodeId, |
| alignedPath, |
| Ordering.ASC, |
| getDefaultSeriesScanOptions(alignedPath), |
| false, |
| null, |
| -1); |
| seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| int count = 0; |
| while (seriesScanOperator.hasNext()) { |
| TsBlock tsBlock = seriesScanOperator.next(); |
| assertEquals(6, tsBlock.getValueColumnCount()); |
| assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(1) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(2) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(3) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn); |
| |
| for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { |
| assertEquals(count, tsBlock.getTimeByIndex(i)); |
| int delta = 0; |
| if ((long) count < 200) { |
| delta = 20000; |
| } else if ((long) count < 260 |
| || ((long) count >= 300 && (long) count < 380) |
| || (long) count >= 400) { |
| delta = 10000; |
| } |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(0).getBoolean(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(4).getDouble(i), DELTA); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(5).getBinary(i).toString()); |
| } |
| } |
| assertEquals(500, count); |
| } catch (IllegalPathException e) { |
| e.printStackTrace(); |
| fail(); |
| } finally { |
| instanceNotificationExecutor.shutdown(); |
| } |
| } |
| |
| @Test |
| public void batchTest2() throws Exception { |
| ExecutorService instanceNotificationExecutor = |
| IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); |
| try { |
| AlignedPath alignedPath1 = |
| new AlignedPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device0", |
| measurementSchemas.stream() |
| .map(MeasurementSchema::getMeasurementId) |
| .collect(Collectors.toList()), |
| measurementSchemas.stream() |
| .map(m -> (IMeasurementSchema) m) |
| .collect(Collectors.toList())); |
| QueryId queryId = new QueryId("stub_query"); |
| FragmentInstanceId instanceId = |
| new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); |
| FragmentInstanceStateMachine stateMachine = |
| new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); |
| FragmentInstanceContext fragmentInstanceContext = |
| createFragmentInstanceContext(instanceId, stateMachine); |
| DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); |
| PlanNodeId planNodeId1 = new PlanNodeId("1"); |
| driverContext.addOperatorContext( |
| 1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId2 = new PlanNodeId("2"); |
| driverContext.addOperatorContext( |
| 2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId3 = new PlanNodeId("3"); |
| driverContext.addOperatorContext(3, planNodeId3, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId4 = new PlanNodeId("4"); |
| driverContext.addOperatorContext(4, planNodeId4, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId5 = new PlanNodeId("5"); |
| driverContext.addOperatorContext(5, planNodeId5, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId6 = new PlanNodeId("6"); |
| driverContext.addOperatorContext(6, planNodeId6, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId7 = new PlanNodeId("7"); |
| driverContext.addOperatorContext(7, planNodeId7, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId8 = new PlanNodeId("8"); |
| driverContext.addOperatorContext(8, planNodeId8, SeriesScanOperator.class.getSimpleName()); |
| driverContext.addOperatorContext( |
| 9, new PlanNodeId("9"), FullOuterTimeJoinOperator.class.getSimpleName()); |
| AlignedSeriesScanOperator seriesScanOperator1 = |
| new AlignedSeriesScanOperator( |
| driverContext.getOperatorContexts().get(0), |
| planNodeId1, |
| alignedPath1, |
| Ordering.ASC, |
| getDefaultSeriesScanOptions(alignedPath1), |
| false, |
| null, |
| -1); |
| seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator1 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| AlignedPath alignedPath2 = |
| new AlignedPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device1", |
| measurementSchemas.stream() |
| .map(MeasurementSchema::getMeasurementId) |
| .collect(Collectors.toList()), |
| measurementSchemas.stream() |
| .map(m -> (IMeasurementSchema) m) |
| .collect(Collectors.toList())); |
| AlignedSeriesScanOperator seriesScanOperator2 = |
| new AlignedSeriesScanOperator( |
| driverContext.getOperatorContexts().get(1), |
| planNodeId2, |
| alignedPath2, |
| Ordering.ASC, |
| getDefaultSeriesScanOptions(alignedPath2), |
| false, |
| null, |
| -1); |
| seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator2 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| Set<String> allSensors = new HashSet<>(); |
| allSensors.add("sensor0"); |
| allSensors.add("sensor1"); |
| allSensors.add("sensor2"); |
| allSensors.add("sensor3"); |
| allSensors.add("sensor4"); |
| allSensors.add("sensor5"); |
| |
| MeasurementPath measurementPath3 = |
| new MeasurementPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN); |
| |
| SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); |
| scanOptionsBuilder.withAllSensors(allSensors); |
| SeriesScanOperator seriesScanOperator3 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(2), |
| planNodeId3, |
| measurementPath3, |
| Ordering.ASC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator3 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath4 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32); |
| SeriesScanOperator seriesScanOperator4 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(3), |
| planNodeId4, |
| measurementPath4, |
| Ordering.ASC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator4 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath5 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64); |
| SeriesScanOperator seriesScanOperator5 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(4), |
| planNodeId5, |
| measurementPath5, |
| Ordering.ASC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator5 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath6 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT); |
| SeriesScanOperator seriesScanOperator6 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(5), |
| planNodeId6, |
| measurementPath6, |
| Ordering.ASC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator6 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath7 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE); |
| SeriesScanOperator seriesScanOperator7 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(6), |
| planNodeId7, |
| measurementPath7, |
| Ordering.ASC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator7 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath8 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.TEXT); |
| SeriesScanOperator seriesScanOperator8 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(7), |
| planNodeId8, |
| measurementPath8, |
| Ordering.ASC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator8 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| FullOuterTimeJoinOperator timeJoinOperator = |
| new FullOuterTimeJoinOperator( |
| driverContext.getOperatorContexts().get(8), |
| Arrays.asList( |
| seriesScanOperator1, |
| seriesScanOperator2, |
| seriesScanOperator3, |
| seriesScanOperator4, |
| seriesScanOperator5, |
| seriesScanOperator6, |
| seriesScanOperator7, |
| seriesScanOperator8), |
| Ordering.ASC, |
| Arrays.asList( |
| TSDataType.BOOLEAN, |
| TSDataType.INT32, |
| TSDataType.INT64, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| TSDataType.TEXT, |
| TSDataType.BOOLEAN, |
| TSDataType.INT32, |
| TSDataType.INT64, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| TSDataType.TEXT, |
| TSDataType.BOOLEAN, |
| TSDataType.INT32, |
| TSDataType.INT64, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| TSDataType.TEXT), |
| Arrays.asList( |
| new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 1), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 2), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 3), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 4), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 5), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 1), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 2), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 3), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 4), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 5), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(2, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(3, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(4, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(5, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(6, 0), new AscTimeComparator()), |
| new SingleColumnMerger(new InputLocation(7, 0), new AscTimeComparator())), |
| new AscTimeComparator()); |
| timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| int count = 0; |
| while (timeJoinOperator.isBlocked().isDone() && timeJoinOperator.hasNext()) { |
| TsBlock tsBlock = timeJoinOperator.next(); |
| assertEquals(18, tsBlock.getValueColumnCount()); |
| assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(1) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(2) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(3) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn); |
| assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(7) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(8) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(9) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn); |
| assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(13) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(14) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(15) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn); |
| |
| for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { |
| assertEquals(count, tsBlock.getTimeByIndex(i)); |
| int delta = 0; |
| if ((long) count < 200) { |
| delta = 20000; |
| } else if ((long) count < 260 |
| || ((long) count >= 300 && (long) count < 380) |
| || (long) count >= 400) { |
| delta = 10000; |
| } |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(0).getBoolean(i)); |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(6).getBoolean(i)); |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(12).getBoolean(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(7).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(13).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(8).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(14).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(9).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(15).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(4).getDouble(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(10).getDouble(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(16).getDouble(i), DELTA); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(5).getBinary(i).toString()); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(11).getBinary(i).toString()); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(17).getBinary(i).toString()); |
| } |
| } |
| assertEquals(500, count); |
| } catch (IllegalPathException e) { |
| e.printStackTrace(); |
| fail(); |
| } finally { |
| instanceNotificationExecutor.shutdown(); |
| } |
| } |
| |
| /** order by time desc */ |
| @Test |
| public void batchTest3() throws Exception { |
| ExecutorService instanceNotificationExecutor = |
| IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); |
| try { |
| AlignedPath alignedPath1 = |
| new AlignedPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device0", |
| measurementSchemas.stream() |
| .map(MeasurementSchema::getMeasurementId) |
| .collect(Collectors.toList()), |
| measurementSchemas.stream() |
| .map(m -> (IMeasurementSchema) m) |
| .collect(Collectors.toList())); |
| QueryId queryId = new QueryId("stub_query"); |
| FragmentInstanceId instanceId = |
| new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); |
| FragmentInstanceStateMachine stateMachine = |
| new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); |
| FragmentInstanceContext fragmentInstanceContext = |
| createFragmentInstanceContext(instanceId, stateMachine); |
| DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); |
| PlanNodeId planNodeId1 = new PlanNodeId("1"); |
| driverContext.addOperatorContext( |
| 1, planNodeId1, AlignedSeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId2 = new PlanNodeId("2"); |
| driverContext.addOperatorContext( |
| 2, planNodeId2, AlignedSeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId3 = new PlanNodeId("3"); |
| driverContext.addOperatorContext(3, planNodeId3, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId4 = new PlanNodeId("4"); |
| driverContext.addOperatorContext(4, planNodeId4, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId5 = new PlanNodeId("5"); |
| driverContext.addOperatorContext(5, planNodeId5, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId6 = new PlanNodeId("6"); |
| driverContext.addOperatorContext(6, planNodeId6, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId7 = new PlanNodeId("7"); |
| driverContext.addOperatorContext(7, planNodeId7, SeriesScanOperator.class.getSimpleName()); |
| PlanNodeId planNodeId8 = new PlanNodeId("8"); |
| driverContext.addOperatorContext(8, planNodeId8, SeriesScanOperator.class.getSimpleName()); |
| driverContext.addOperatorContext( |
| 9, new PlanNodeId("9"), FullOuterTimeJoinOperator.class.getSimpleName()); |
| AlignedSeriesScanOperator seriesScanOperator1 = |
| new AlignedSeriesScanOperator( |
| driverContext.getOperatorContexts().get(0), |
| planNodeId1, |
| alignedPath1, |
| Ordering.DESC, |
| getDefaultSeriesScanOptions(alignedPath1), |
| false, |
| null, |
| -1); |
| seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator1 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| AlignedPath alignedPath2 = |
| new AlignedPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device1", |
| measurementSchemas.stream() |
| .map(MeasurementSchema::getMeasurementId) |
| .collect(Collectors.toList()), |
| measurementSchemas.stream() |
| .map(m -> (IMeasurementSchema) m) |
| .collect(Collectors.toList())); |
| AlignedSeriesScanOperator seriesScanOperator2 = |
| new AlignedSeriesScanOperator( |
| driverContext.getOperatorContexts().get(1), |
| planNodeId2, |
| alignedPath2, |
| Ordering.DESC, |
| getDefaultSeriesScanOptions(alignedPath2), |
| false, |
| null, |
| -1); |
| seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator2 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| Set<String> allSensors = new HashSet<>(); |
| allSensors.add("sensor0"); |
| allSensors.add("sensor1"); |
| allSensors.add("sensor2"); |
| allSensors.add("sensor3"); |
| allSensors.add("sensor4"); |
| allSensors.add("sensor5"); |
| |
| MeasurementPath measurementPath3 = |
| new MeasurementPath( |
| SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.BOOLEAN); |
| SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); |
| scanOptionsBuilder.withAllSensors(allSensors); |
| SeriesScanOperator seriesScanOperator3 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(2), |
| planNodeId3, |
| measurementPath3, |
| Ordering.DESC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator3 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath4 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32); |
| SeriesScanOperator seriesScanOperator4 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(3), |
| planNodeId4, |
| measurementPath4, |
| Ordering.DESC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator4 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath5 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor2", TSDataType.INT64); |
| SeriesScanOperator seriesScanOperator5 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(4), |
| planNodeId5, |
| measurementPath5, |
| Ordering.DESC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator5 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath6 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor3", TSDataType.FLOAT); |
| SeriesScanOperator seriesScanOperator6 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(5), |
| planNodeId6, |
| measurementPath6, |
| Ordering.DESC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator6 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath7 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor4", TSDataType.DOUBLE); |
| SeriesScanOperator seriesScanOperator7 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(6), |
| planNodeId7, |
| measurementPath7, |
| Ordering.DESC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator7 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| MeasurementPath measurementPath8 = |
| new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device2.sensor5", TSDataType.TEXT); |
| SeriesScanOperator seriesScanOperator8 = |
| new SeriesScanOperator( |
| driverContext.getOperatorContexts().get(7), |
| planNodeId8, |
| measurementPath8, |
| Ordering.DESC, |
| scanOptionsBuilder.build()); |
| seriesScanOperator8.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); |
| seriesScanOperator8 |
| .getOperatorContext() |
| .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| FullOuterTimeJoinOperator timeJoinOperator = |
| new FullOuterTimeJoinOperator( |
| driverContext.getOperatorContexts().get(8), |
| Arrays.asList( |
| seriesScanOperator1, |
| seriesScanOperator2, |
| seriesScanOperator3, |
| seriesScanOperator4, |
| seriesScanOperator5, |
| seriesScanOperator6, |
| seriesScanOperator7, |
| seriesScanOperator8), |
| Ordering.DESC, |
| Arrays.asList( |
| TSDataType.BOOLEAN, |
| TSDataType.INT32, |
| TSDataType.INT64, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| TSDataType.TEXT, |
| TSDataType.BOOLEAN, |
| TSDataType.INT32, |
| TSDataType.INT64, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| TSDataType.TEXT, |
| TSDataType.BOOLEAN, |
| TSDataType.INT32, |
| TSDataType.INT64, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| TSDataType.TEXT), |
| Arrays.asList( |
| new SingleColumnMerger(new InputLocation(0, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 1), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 2), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 3), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 4), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(0, 5), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 1), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 2), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 3), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 4), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(1, 5), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(2, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(3, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(4, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(5, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(6, 0), new DescTimeComparator()), |
| new SingleColumnMerger(new InputLocation(7, 0), new DescTimeComparator())), |
| new DescTimeComparator()); |
| timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); |
| |
| int count = 499; |
| while (timeJoinOperator.isBlocked().isDone() && timeJoinOperator.hasNext()) { |
| TsBlock tsBlock = timeJoinOperator.next(); |
| assertEquals(18, tsBlock.getValueColumnCount()); |
| assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(1) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(2) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(3) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn); |
| assertTrue(tsBlock.getColumn(6) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(7) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(8) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(9) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(10) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(11) instanceof BinaryColumn); |
| assertTrue(tsBlock.getColumn(12) instanceof BooleanColumn); |
| assertTrue(tsBlock.getColumn(13) instanceof IntColumn); |
| assertTrue(tsBlock.getColumn(14) instanceof LongColumn); |
| assertTrue(tsBlock.getColumn(15) instanceof FloatColumn); |
| assertTrue(tsBlock.getColumn(16) instanceof DoubleColumn); |
| assertTrue(tsBlock.getColumn(17) instanceof BinaryColumn); |
| |
| for (int i = 0; i < tsBlock.getPositionCount(); i++, count--) { |
| assertEquals(count, tsBlock.getTimeByIndex(i)); |
| int delta = 0; |
| if ((long) count < 200) { |
| delta = 20000; |
| } else if ((long) count < 260 |
| || ((long) count >= 300 && (long) count < 380) |
| || (long) count >= 400) { |
| delta = 10000; |
| } |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(0).getBoolean(i)); |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(6).getBoolean(i)); |
| assertEquals((delta + (long) count) % 2 == 0, tsBlock.getColumn(12).getBoolean(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(7).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(13).getInt(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(8).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(14).getLong(i)); |
| assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(9).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(15).getFloat(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(4).getDouble(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(10).getDouble(i), DELTA); |
| assertEquals(delta + (long) count, tsBlock.getColumn(16).getDouble(i), DELTA); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(5).getBinary(i).toString()); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(11).getBinary(i).toString()); |
| assertEquals( |
| String.valueOf(delta + (long) count), tsBlock.getColumn(17).getBinary(i).toString()); |
| } |
| } |
| assertEquals(-1, count); |
| } catch (IllegalPathException e) { |
| e.printStackTrace(); |
| fail(); |
| } finally { |
| instanceNotificationExecutor.shutdown(); |
| } |
| } |
| } |