| /* |
| * |
| * * Licensed to the Apache Software Foundation (ASF) under one |
| * * or more contributor license agreements. See the NOTICE file |
| * * distributed with this work for additional information |
| * * regarding copyright ownership. The ASF licenses this file |
| * * to you under the Apache License, Version 2.0 (the |
| * * "License"); you may not use this file except in compliance |
| * * with the License. You may obtain a copy of the License at |
| * * |
| * * http://www.apache.org/licenses/LICENSE-2.0 |
| * * |
| * * Unless required by applicable law or agreed to in writing, |
| * * software distributed under the License is distributed on an |
| * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * * KIND, either express or implied. See the License for the |
| * * specific language governing permissions and limitations |
| * * under the License. |
| * |
| */ |
| |
| package org.apache.iotdb.db.queryengine.transformation.dag.transformer.ternary; |
| |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader; |
| import org.apache.iotdb.db.queryengine.transformation.api.YieldableState; |
| import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer; |
| |
| import org.apache.tsfile.enums.TSDataType; |
| |
| import java.io.IOException; |
| |
| public abstract class TernaryTransformer extends Transformer { |
| protected final LayerPointReader firstPointReader; |
| protected final LayerPointReader secondPointReader; |
| protected final LayerPointReader thirdPointReader; |
| |
| protected final TSDataType firstPointReaderDataType; |
| protected final TSDataType secondPointReaderDataType; |
| protected final TSDataType thirdPointReaderDataType; |
| |
| protected final boolean isFirstPointReaderConstant; |
| protected final boolean isSecondPointReaderConstant; |
| protected final boolean isThirdPointReaderConstant; |
| |
| protected final boolean isCurrentConstant; |
| |
| @Override |
| protected YieldableState yieldValue() throws Exception { |
| final YieldableState firstYieldableState = firstPointReader.yield(); |
| final YieldableState secondYieldableState = secondPointReader.yield(); |
| final YieldableState thirdYieldableState = thirdPointReader.yield(); |
| |
| if (YieldableState.NOT_YIELDABLE_NO_MORE_DATA.equals(firstYieldableState) |
| || YieldableState.NOT_YIELDABLE_NO_MORE_DATA.equals(secondYieldableState) |
| || YieldableState.NOT_YIELDABLE_NO_MORE_DATA.equals(thirdYieldableState)) { |
| return YieldableState.NOT_YIELDABLE_NO_MORE_DATA; |
| } |
| |
| if (YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA.equals(firstYieldableState) |
| || YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA.equals(secondYieldableState) |
| || YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA.equals(thirdYieldableState)) { |
| return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA; |
| } |
| |
| final YieldableState timeYieldState = yieldTime(); |
| if (!YieldableState.YIELDABLE.equals(timeYieldState)) { |
| return timeYieldState; |
| } |
| |
| if (firstPointReader.isCurrentNull() |
| || secondPointReader.isCurrentNull() |
| || thirdPointReader.isCurrentNull()) { |
| currentNull = true; |
| } else { |
| transformAndCache(); |
| } |
| |
| firstPointReader.readyForNext(); |
| secondPointReader.readyForNext(); |
| thirdPointReader.readyForNext(); |
| return YieldableState.YIELDABLE; |
| } |
| |
| private YieldableState yieldTime() throws Exception { |
| if (isCurrentConstant) { |
| return YieldableState.YIELDABLE; |
| } |
| |
| long firstTime = isFirstPointReaderConstant ? Long.MIN_VALUE : firstPointReader.currentTime(); |
| long secondTime = |
| isSecondPointReaderConstant ? Long.MIN_VALUE : secondPointReader.currentTime(); |
| long thirdTime = isThirdPointReaderConstant ? Long.MIN_VALUE : thirdPointReader.currentTime(); |
| |
| while (firstTime != secondTime || firstTime != thirdTime) { // the logic is similar to MergeSort |
| if (firstTime < secondTime) { |
| if (isFirstPointReaderConstant) { |
| firstTime = secondTime; |
| } else { |
| firstPointReader.readyForNext(); |
| final YieldableState firstYieldableState = firstPointReader.yield(); |
| if (!YieldableState.YIELDABLE.equals(firstYieldableState)) { |
| return firstYieldableState; |
| } |
| firstTime = firstPointReader.currentTime(); |
| } |
| } else if (secondTime < thirdTime) { |
| if (isSecondPointReaderConstant) { |
| secondTime = thirdTime; |
| } else { |
| secondPointReader.readyForNext(); |
| final YieldableState secondYieldableState = secondPointReader.yield(); |
| if (!YieldableState.YIELDABLE.equals(secondYieldableState)) { |
| return secondYieldableState; |
| } |
| secondTime = secondPointReader.currentTime(); |
| } |
| } else { |
| if (isThirdPointReaderConstant) { |
| thirdTime = firstTime; |
| } else { |
| thirdPointReader.readyForNext(); |
| final YieldableState thirdYieldableState = thirdPointReader.yield(); |
| if (!YieldableState.YIELDABLE.equals(thirdYieldableState)) { |
| return thirdYieldableState; |
| } |
| thirdTime = thirdPointReader.currentTime(); |
| } |
| } |
| } |
| |
| if (firstTime != Long.MIN_VALUE) { |
| cachedTime = firstTime; |
| } |
| return YieldableState.YIELDABLE; |
| } |
| |
| protected TernaryTransformer( |
| LayerPointReader firstPointReader, |
| LayerPointReader secondPointReader, |
| LayerPointReader thirdPointReader) { |
| this.firstPointReader = firstPointReader; |
| this.secondPointReader = secondPointReader; |
| this.thirdPointReader = thirdPointReader; |
| this.firstPointReaderDataType = firstPointReader.getDataType(); |
| this.secondPointReaderDataType = secondPointReader.getDataType(); |
| this.thirdPointReaderDataType = thirdPointReader.getDataType(); |
| this.isFirstPointReaderConstant = firstPointReader.isConstantPointReader(); |
| this.isSecondPointReaderConstant = secondPointReader.isConstantPointReader(); |
| this.isThirdPointReaderConstant = thirdPointReader.isConstantPointReader(); |
| this.isCurrentConstant = |
| isFirstPointReaderConstant && isSecondPointReaderConstant && isThirdPointReaderConstant; |
| checkType(); |
| } |
| |
| @Override |
| public boolean isConstantPointReader() { |
| return firstPointReader.isConstantPointReader() |
| && secondPointReader.isConstantPointReader() |
| && thirdPointReader.isConstantPointReader(); |
| } |
| |
| @Override |
| protected boolean cacheValue() throws QueryProcessException, IOException { |
| if (!firstPointReader.next() || !secondPointReader.next() || !thirdPointReader.next()) { |
| return false; |
| } |
| |
| if (!cacheTime()) { |
| return false; |
| } |
| |
| if (firstPointReader.isCurrentNull() |
| || secondPointReader.isCurrentNull() |
| || thirdPointReader.isCurrentNull()) { |
| currentNull = true; |
| } else { |
| transformAndCache(); |
| } |
| |
| firstPointReader.readyForNext(); |
| secondPointReader.readyForNext(); |
| thirdPointReader.readyForNext(); |
| return true; |
| } |
| |
| protected abstract void transformAndCache() throws QueryProcessException, IOException; |
| |
| protected abstract void checkType(); |
| |
| /** |
| * finds the smallest, unconsumed, same timestamp that exists in {@code firstPointReader}, {@code |
| * secondPointReader} and {@code thirdPointReader}and then caches the timestamp in {@code |
| * cachedTime}. |
| * |
| * @return true if there has a timestamp that meets the requirements |
| */ |
| private boolean cacheTime() throws IOException, QueryProcessException { |
| boolean isFirstConstant = firstPointReader.isConstantPointReader(); |
| boolean isSecondConstant = secondPointReader.isConstantPointReader(); |
| boolean isThirdConstant = thirdPointReader.isConstantPointReader(); |
| long firstTime = isFirstConstant ? Long.MIN_VALUE : firstPointReader.currentTime(); |
| long secondTime = isSecondConstant ? Long.MIN_VALUE : secondPointReader.currentTime(); |
| long thirdTime = isThirdConstant ? Long.MIN_VALUE : secondPointReader.currentTime(); |
| // Long.MIN_VALUE is used to determine whether (isFirstConstant and isSecondConstant and |
| // isThirdConstant) is true |
| while (firstTime != secondTime || firstTime != thirdTime) { // the logic is similar to MergeSort |
| if (firstTime < secondTime) { |
| if (isFirstConstant) { |
| firstTime = secondTime; |
| } else { |
| firstPointReader.readyForNext(); |
| if (!firstPointReader.next()) { |
| return false; |
| } |
| firstTime = firstPointReader.currentTime(); |
| } |
| } else if (secondTime < thirdTime) { |
| if (isSecondConstant) { |
| secondTime = thirdTime; |
| } else { |
| secondPointReader.readyForNext(); |
| if (!secondPointReader.next()) { |
| return false; |
| } |
| secondTime = secondPointReader.currentTime(); |
| } |
| } else { |
| if (isThirdConstant) { |
| thirdTime = firstTime; |
| } else { |
| thirdPointReader.readyForNext(); |
| if (!thirdPointReader.next()) { |
| return false; |
| } |
| thirdTime = secondPointReader.currentTime(); |
| } |
| } |
| } |
| |
| if (firstTime != Long.MIN_VALUE) { |
| cachedTime = firstTime; |
| } |
| return true; |
| } |
| |
| protected static double castCurrentValueToDoubleOperand( |
| LayerPointReader layerPointReader, TSDataType layerPointReaderDataType) |
| throws IOException, QueryProcessException { |
| switch (layerPointReaderDataType) { |
| case INT32: |
| return layerPointReader.currentInt(); |
| case INT64: |
| return layerPointReader.currentLong(); |
| case FLOAT: |
| return layerPointReader.currentFloat(); |
| case DOUBLE: |
| return layerPointReader.currentDouble(); |
| case BOOLEAN: |
| return layerPointReader.currentBoolean() ? 1.0d : 0.0d; |
| default: |
| throw new QueryProcessException( |
| "Unsupported data type: " + layerPointReader.getDataType().toString()); |
| } |
| } |
| } |