blob: 640612adab7052d0812d42d1bce491ba0bbbb428 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignByTimeDataSet {
protected TimeSelector timeHeap;
/** execute with value filter */
public UDTFAlignByTimeDataSet(
QueryContext context,
UDTFPlan udtfPlan,
List<PartialPath> deduplicatedPaths,
List<TSDataType> deduplicatedDataTypes,
TimeGenerator timestampGenerator,
List<IReaderByTimestamp> readersOfSelectedSeries,
List<Boolean> cached)
throws IOException, QueryProcessException {
super(
context,
udtfPlan,
deduplicatedPaths,
deduplicatedDataTypes,
timestampGenerator,
readersOfSelectedSeries,
cached);
initTimeHeap();
}
/** execute without value filter */
public UDTFAlignByTimeDataSet(
QueryContext context,
UDTFPlan udtfPlan,
List<PartialPath> deduplicatedPaths,
List<TSDataType> deduplicatedDataTypes,
List<ManagedSeriesReader> readersOfSelectedSeries)
throws QueryProcessException, IOException, InterruptedException {
super(context, udtfPlan, deduplicatedPaths, deduplicatedDataTypes, readersOfSelectedSeries);
initTimeHeap();
}
protected void initTimeHeap() throws IOException, QueryProcessException {
timeHeap = new TimeSelector(transformers.length << 1, true);
for (LayerPointReader reader : transformers) {
if (reader.next()) {
timeHeap.add(reader.currentTime());
}
}
}
@Override
public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
throws IOException, QueryProcessException {
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
int columnsNum = transformers.length;
PublicBAOS timeBAOS = new PublicBAOS();
PublicBAOS[] valueBAOSList = new PublicBAOS[columnsNum];
PublicBAOS[] bitmapBAOSList = new PublicBAOS[columnsNum];
for (int i = 0; i < columnsNum; ++i) {
valueBAOSList[i] = new PublicBAOS();
bitmapBAOSList[i] = new PublicBAOS();
}
int[] currentBitmapList = new int[columnsNum];
int rowCount = 0;
while (rowCount < fetchSize
&& (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
&& !timeHeap.isEmpty()) {
long minTime = timeHeap.pollFirst();
if (rowOffset == 0) {
timeBAOS.write(BytesUtils.longToBytes(minTime));
}
for (int i = 0; i < columnsNum; ++i) {
LayerPointReader reader = transformers[i];
if (!reader.next() || reader.currentTime() != minTime) {
if (rowOffset == 0) {
currentBitmapList[i] = (currentBitmapList[i] << 1);
}
continue;
}
if (rowOffset == 0) {
currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
TSDataType type = reader.getDataType();
switch (type) {
case INT32:
int intValue = reader.currentInt();
ReadWriteIOUtils.write(
encoder != null && encoder.needEncode(minTime)
? encoder.encodeInt(intValue, minTime)
: intValue,
valueBAOSList[i]);
break;
case INT64:
long longValue = reader.currentLong();
ReadWriteIOUtils.write(
encoder != null && encoder.needEncode(minTime)
? encoder.encodeLong(longValue, minTime)
: longValue,
valueBAOSList[i]);
break;
case FLOAT:
float floatValue = reader.currentFloat();
ReadWriteIOUtils.write(
encoder != null && encoder.needEncode(minTime)
? encoder.encodeFloat(floatValue, minTime)
: floatValue,
valueBAOSList[i]);
break;
case DOUBLE:
double doubleValue = reader.currentDouble();
ReadWriteIOUtils.write(
encoder != null && encoder.needEncode(minTime)
? encoder.encodeDouble(doubleValue, minTime)
: doubleValue,
valueBAOSList[i]);
break;
case BOOLEAN:
ReadWriteIOUtils.write(reader.currentBoolean(), valueBAOSList[i]);
break;
case TEXT:
ReadWriteIOUtils.write(reader.currentBinary(), valueBAOSList[i]);
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", type));
}
}
reader.readyForNext();
if (reader.next()) {
timeHeap.add(reader.currentTime());
}
}
if (rowOffset == 0) {
++rowCount;
if (rowCount % 8 == 0) {
for (int i = 0; i < columnsNum; ++i) {
ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]);
currentBitmapList[i] = 0;
}
}
if (rowLimit > 0) {
++alreadyReturnedRowNum;
}
} else {
--rowOffset;
}
inputLayer.updateRowRecordListEvictionUpperBound();
}
/*
* feed the bitmap with remaining 0 in the right
* if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000
*/
if (rowCount > 0) {
int remaining = rowCount % 8;
if (remaining != 0) {
for (int i = 0; i < columnsNum; ++i) {
ReadWriteIOUtils.write(
(byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]);
}
}
}
return packBuffer(tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList);
}
protected TSQueryDataSet packBuffer(
TSQueryDataSet tsQueryDataSet,
PublicBAOS timeBAOS,
PublicBAOS[] valueBAOSList,
PublicBAOS[] bitmapBAOSList) {
int columnsNum = transformers.length;
ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
timeBuffer.flip();
tsQueryDataSet.setTime(timeBuffer);
List<ByteBuffer> valueBufferList = new ArrayList<>();
List<ByteBuffer> bitmapBufferList = new ArrayList<>();
for (int i = 0; i < columnsNum; ++i) {
putPBOSToBuffer(valueBAOSList, valueBufferList, i);
putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i);
}
tsQueryDataSet.setValueList(valueBufferList);
tsQueryDataSet.setBitmapList(bitmapBufferList);
return tsQueryDataSet;
}
protected void putPBOSToBuffer(
PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int tsIndex) {
ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapBAOSList[tsIndex].size());
bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, bitmapBAOSList[tsIndex].size());
bitmapBuffer.flip();
bitmapBufferList.add(bitmapBuffer);
}
@Override
public boolean hasNextWithoutConstraint() {
return !timeHeap.isEmpty();
}
@Override
public RowRecord nextWithoutConstraint() throws IOException {
long minTime = timeHeap.pollFirst();
RowRecord rowRecord = new RowRecord(minTime);
try {
for (LayerPointReader reader : transformers) {
if (!reader.next() || reader.currentTime() != minTime) {
rowRecord.addField(null);
continue;
}
Object value;
switch (reader.getDataType()) {
case INT32:
value = reader.currentInt();
break;
case INT64:
value = reader.currentLong();
break;
case FLOAT:
value = reader.currentFloat();
break;
case DOUBLE:
value = reader.currentDouble();
break;
case BOOLEAN:
value = reader.currentBoolean();
break;
case TEXT:
value = reader.currentBinary();
break;
default:
throw new UnSupportedDataTypeException("Unsupported data type.");
}
rowRecord.addField(value, reader.getDataType());
reader.readyForNext();
if (reader.next()) {
timeHeap.add(reader.currentTime());
}
}
} catch (QueryProcessException e) {
throw new IOException(e.getMessage());
}
inputLayer.updateRowRecordListEvictionUpperBound();
return rowRecord;
}
}