blob: 730ff1330bdc94c032b47e5f3ec5e7c4ebcb3fd6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.transformation.dag.util;
import org.apache.iotdb.commons.udf.utils.UDFBinaryTransformer;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.YieldableState;
import org.apache.iotdb.db.queryengine.transformation.dag.input.IUDFInputDataSet;
import org.apache.iotdb.db.queryengine.transformation.datastructure.row.ElasticSerializableRowRecordList;
import org.apache.iotdb.db.queryengine.transformation.datastructure.tv.ElasticSerializableTVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
public class LayerCacheUtils {
private LayerCacheUtils() {}
public static YieldableState yieldPoints(
TSDataType dataType,
LayerPointReader source,
ElasticSerializableTVList target,
int pointNumber)
throws Exception {
int count = 0;
while (count < pointNumber) {
final YieldableState yieldableState = yieldPoint(dataType, source, target);
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
++count;
}
return YieldableState.YIELDABLE;
}
public static YieldableState yieldPoint(
TSDataType dataType, LayerPointReader source, ElasticSerializableTVList target)
throws Exception {
final YieldableState yieldableState = source.yield();
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
if (source.isCurrentNull()) {
target.putNull(source.currentTime());
} else {
switch (dataType) {
case INT32:
target.putInt(source.currentTime(), source.currentInt());
break;
case INT64:
target.putLong(source.currentTime(), source.currentLong());
break;
case FLOAT:
target.putFloat(source.currentTime(), source.currentFloat());
break;
case DOUBLE:
target.putDouble(source.currentTime(), source.currentDouble());
break;
case BOOLEAN:
target.putBoolean(source.currentTime(), source.currentBoolean());
break;
case TEXT:
target.putBinary(
source.currentTime(),
UDFBinaryTransformer.transformToUDFBinary(source.currentBinary()));
break;
default:
throw new UnsupportedOperationException(dataType.name());
}
}
source.readyForNext();
return YieldableState.YIELDABLE;
}
/**
* try to generate rows from source.
*
* @return number of actually collected, which may be less than or equals to rowsNumber.
*/
public static YieldableState yieldRows(
IUDFInputDataSet source, ElasticSerializableRowRecordList target, int rowsNumber)
throws Exception {
int count = 0;
while (count < rowsNumber) {
final YieldableState yieldableState = yieldRow(source, target);
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
++count;
}
return YieldableState.YIELDABLE;
}
public static YieldableState yieldRow(
IUDFInputDataSet source, ElasticSerializableRowRecordList target) throws Exception {
final YieldableState yieldableState = source.canYieldNextRowInObjects();
if (yieldableState == YieldableState.YIELDABLE) {
target.put(source.nextRowInObjects());
}
return yieldableState;
}
/**
* cachePoints in ElasticSerializableTVList.
*
* @return number of actually collected, which may be less than or equals to pointNumber.
*/
public static int cachePoints(
TSDataType dataType,
LayerPointReader source,
ElasticSerializableTVList target,
int pointNumber)
throws QueryProcessException, IOException {
int count = 0;
while (count < pointNumber && cachePoint(dataType, source, target)) {
++count;
}
return count;
}
public static boolean cachePoint(
TSDataType dataType, LayerPointReader source, ElasticSerializableTVList target)
throws IOException, QueryProcessException {
if (!source.next()) {
return false;
}
if (source.isCurrentNull()) {
target.putNull(source.currentTime());
} else {
switch (dataType) {
case INT32:
target.putInt(source.currentTime(), source.currentInt());
break;
case INT64:
target.putLong(source.currentTime(), source.currentLong());
break;
case FLOAT:
target.putFloat(source.currentTime(), source.currentFloat());
break;
case DOUBLE:
target.putDouble(source.currentTime(), source.currentDouble());
break;
case BOOLEAN:
target.putBoolean(source.currentTime(), source.currentBoolean());
break;
case TEXT:
target.putBinary(
source.currentTime(),
UDFBinaryTransformer.transformToUDFBinary(source.currentBinary()));
break;
default:
throw new UnsupportedOperationException(dataType.name());
}
}
source.readyForNext();
return true;
}
/**
* cache rows in ElasticSerializableRowRecordList.
*
* @return number of actually collected, which may be less than or equals to rowsNumber.
*/
public static int cacheRows(
IUDFInputDataSet source, ElasticSerializableRowRecordList target, int rowsNumber)
throws QueryProcessException, IOException {
int count = 0;
while (count < rowsNumber && cacheRow(source, target)) {
++count;
}
return count;
}
public static boolean cacheRow(IUDFInputDataSet source, ElasticSerializableRowRecordList target)
throws IOException, QueryProcessException {
if (source.hasNextRowInObjects()) {
target.put(source.nextRowInObjects());
return true;
} else {
return false;
}
}
}