blob: a0ee4f215edc9f72fca21d7cf1913b65a0f574c1 [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/
package org.apache.iotdb.db.queryengine.transformation.dag.transformer.ternary;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.YieldableState;
import org.apache.iotdb.db.queryengine.transformation.dag.transformer.Transformer;
import org.apache.tsfile.enums.TSDataType;
import java.io.IOException;
public abstract class TernaryTransformer extends Transformer {
protected final LayerPointReader firstPointReader;
protected final LayerPointReader secondPointReader;
protected final LayerPointReader thirdPointReader;
protected final TSDataType firstPointReaderDataType;
protected final TSDataType secondPointReaderDataType;
protected final TSDataType thirdPointReaderDataType;
protected final boolean isFirstPointReaderConstant;
protected final boolean isSecondPointReaderConstant;
protected final boolean isThirdPointReaderConstant;
protected final boolean isCurrentConstant;
@Override
protected YieldableState yieldValue() throws Exception {
final YieldableState firstYieldableState = firstPointReader.yield();
final YieldableState secondYieldableState = secondPointReader.yield();
final YieldableState thirdYieldableState = thirdPointReader.yield();
if (YieldableState.NOT_YIELDABLE_NO_MORE_DATA.equals(firstYieldableState)
|| YieldableState.NOT_YIELDABLE_NO_MORE_DATA.equals(secondYieldableState)
|| YieldableState.NOT_YIELDABLE_NO_MORE_DATA.equals(thirdYieldableState)) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
if (YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA.equals(firstYieldableState)
|| YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA.equals(secondYieldableState)
|| YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA.equals(thirdYieldableState)) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
final YieldableState timeYieldState = yieldTime();
if (!YieldableState.YIELDABLE.equals(timeYieldState)) {
return timeYieldState;
}
if (firstPointReader.isCurrentNull()
|| secondPointReader.isCurrentNull()
|| thirdPointReader.isCurrentNull()) {
currentNull = true;
} else {
transformAndCache();
}
firstPointReader.readyForNext();
secondPointReader.readyForNext();
thirdPointReader.readyForNext();
return YieldableState.YIELDABLE;
}
private YieldableState yieldTime() throws Exception {
if (isCurrentConstant) {
return YieldableState.YIELDABLE;
}
long firstTime = isFirstPointReaderConstant ? Long.MIN_VALUE : firstPointReader.currentTime();
long secondTime =
isSecondPointReaderConstant ? Long.MIN_VALUE : secondPointReader.currentTime();
long thirdTime = isThirdPointReaderConstant ? Long.MIN_VALUE : thirdPointReader.currentTime();
while (firstTime != secondTime || firstTime != thirdTime) { // the logic is similar to MergeSort
if (firstTime < secondTime) {
if (isFirstPointReaderConstant) {
firstTime = secondTime;
} else {
firstPointReader.readyForNext();
final YieldableState firstYieldableState = firstPointReader.yield();
if (!YieldableState.YIELDABLE.equals(firstYieldableState)) {
return firstYieldableState;
}
firstTime = firstPointReader.currentTime();
}
} else if (secondTime < thirdTime) {
if (isSecondPointReaderConstant) {
secondTime = thirdTime;
} else {
secondPointReader.readyForNext();
final YieldableState secondYieldableState = secondPointReader.yield();
if (!YieldableState.YIELDABLE.equals(secondYieldableState)) {
return secondYieldableState;
}
secondTime = secondPointReader.currentTime();
}
} else {
if (isThirdPointReaderConstant) {
thirdTime = firstTime;
} else {
thirdPointReader.readyForNext();
final YieldableState thirdYieldableState = thirdPointReader.yield();
if (!YieldableState.YIELDABLE.equals(thirdYieldableState)) {
return thirdYieldableState;
}
thirdTime = thirdPointReader.currentTime();
}
}
}
if (firstTime != Long.MIN_VALUE) {
cachedTime = firstTime;
}
return YieldableState.YIELDABLE;
}
protected TernaryTransformer(
LayerPointReader firstPointReader,
LayerPointReader secondPointReader,
LayerPointReader thirdPointReader) {
this.firstPointReader = firstPointReader;
this.secondPointReader = secondPointReader;
this.thirdPointReader = thirdPointReader;
this.firstPointReaderDataType = firstPointReader.getDataType();
this.secondPointReaderDataType = secondPointReader.getDataType();
this.thirdPointReaderDataType = thirdPointReader.getDataType();
this.isFirstPointReaderConstant = firstPointReader.isConstantPointReader();
this.isSecondPointReaderConstant = secondPointReader.isConstantPointReader();
this.isThirdPointReaderConstant = thirdPointReader.isConstantPointReader();
this.isCurrentConstant =
isFirstPointReaderConstant && isSecondPointReaderConstant && isThirdPointReaderConstant;
checkType();
}
@Override
public boolean isConstantPointReader() {
return firstPointReader.isConstantPointReader()
&& secondPointReader.isConstantPointReader()
&& thirdPointReader.isConstantPointReader();
}
@Override
protected boolean cacheValue() throws QueryProcessException, IOException {
if (!firstPointReader.next() || !secondPointReader.next() || !thirdPointReader.next()) {
return false;
}
if (!cacheTime()) {
return false;
}
if (firstPointReader.isCurrentNull()
|| secondPointReader.isCurrentNull()
|| thirdPointReader.isCurrentNull()) {
currentNull = true;
} else {
transformAndCache();
}
firstPointReader.readyForNext();
secondPointReader.readyForNext();
thirdPointReader.readyForNext();
return true;
}
protected abstract void transformAndCache() throws QueryProcessException, IOException;
protected abstract void checkType();
/**
* finds the smallest, unconsumed, same timestamp that exists in {@code firstPointReader}, {@code
* secondPointReader} and {@code thirdPointReader}and then caches the timestamp in {@code
* cachedTime}.
*
* @return true if there has a timestamp that meets the requirements
*/
private boolean cacheTime() throws IOException, QueryProcessException {
boolean isFirstConstant = firstPointReader.isConstantPointReader();
boolean isSecondConstant = secondPointReader.isConstantPointReader();
boolean isThirdConstant = thirdPointReader.isConstantPointReader();
long firstTime = isFirstConstant ? Long.MIN_VALUE : firstPointReader.currentTime();
long secondTime = isSecondConstant ? Long.MIN_VALUE : secondPointReader.currentTime();
long thirdTime = isThirdConstant ? Long.MIN_VALUE : secondPointReader.currentTime();
// Long.MIN_VALUE is used to determine whether (isFirstConstant and isSecondConstant and
// isThirdConstant) is true
while (firstTime != secondTime || firstTime != thirdTime) { // the logic is similar to MergeSort
if (firstTime < secondTime) {
if (isFirstConstant) {
firstTime = secondTime;
} else {
firstPointReader.readyForNext();
if (!firstPointReader.next()) {
return false;
}
firstTime = firstPointReader.currentTime();
}
} else if (secondTime < thirdTime) {
if (isSecondConstant) {
secondTime = thirdTime;
} else {
secondPointReader.readyForNext();
if (!secondPointReader.next()) {
return false;
}
secondTime = secondPointReader.currentTime();
}
} else {
if (isThirdConstant) {
thirdTime = firstTime;
} else {
thirdPointReader.readyForNext();
if (!thirdPointReader.next()) {
return false;
}
thirdTime = secondPointReader.currentTime();
}
}
}
if (firstTime != Long.MIN_VALUE) {
cachedTime = firstTime;
}
return true;
}
protected static double castCurrentValueToDoubleOperand(
LayerPointReader layerPointReader, TSDataType layerPointReaderDataType)
throws IOException, QueryProcessException {
switch (layerPointReaderDataType) {
case INT32:
return layerPointReader.currentInt();
case INT64:
return layerPointReader.currentLong();
case FLOAT:
return layerPointReader.currentFloat();
case DOUBLE:
return layerPointReader.currentDouble();
case BOOLEAN:
return layerPointReader.currentBoolean() ? 1.0d : 0.0d;
default:
throw new QueryProcessException(
"Unsupported data type: " + layerPointReader.getDataType().toString());
}
}
}