blob: 654221963f71a04929b2d74cfb0539fece44c254 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.udf.builtin;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.udf.api.UDTF;
import org.apache.iotdb.db.query.udf.api.access.Row;
import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
import org.apache.iotdb.db.query.udf.api.exception.UDFException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.IOMonitor2;
import org.apache.iotdb.tsfile.read.common.IOMonitor2.DataSetType;
import java.io.IOException;
// This is the UDFM4 in paper.
// The integration test for MAC is in org.apache.iotdb.db.integration.m4.MyTest3.test1_2
public class UDTFM4MAC implements UDTF {
protected TSDataType dataType;
protected long tqs;
protected long tqe;
protected int w;
private long minTime;
private long maxTime;
private long bottomTime;
private long topTime;
private int intMaxV;
private long longMaxV;
private float floatMaxV;
private double doubleMaxV;
private int intMinV;
private long longMinV;
private float floatMinV;
private double doubleMinV;
private int intFirstV;
private long longFirstV;
private float floatFirstV;
private double doubleFirstV;
private int intLastV;
private long longLastV;
private float floatLastV;
private double doubleLastV;
private String[] result;
private int idx;
private void init() {
this.minTime = Long.MAX_VALUE;
this.maxTime = Long.MIN_VALUE;
this.intFirstV = 0;
this.longFirstV = 0;
this.floatFirstV = 0;
this.doubleFirstV = 0;
this.intLastV = 0;
this.longLastV = 0;
this.floatLastV = 0;
this.doubleLastV = 0;
this.bottomTime = 0;
this.topTime = 0;
this.intMinV = Integer.MAX_VALUE;
this.longMinV = Long.MAX_VALUE;
this.floatMinV = Float.MAX_VALUE;
this.doubleMinV = Double.MAX_VALUE;
this.intMaxV = Integer.MIN_VALUE;
this.longMaxV = Long.MIN_VALUE;
this.floatMaxV = Float.MIN_VALUE;
this.doubleMaxV = Double.MIN_VALUE;
}
@Override
public void validate(UDFParameterValidator validator) throws UDFException {
IOMonitor2.dataSetType = DataSetType.UDTFAlignByTimeDataSet_M4_POINT;
validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(
0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
.validateRequiredAttribute("tqs")
.validateRequiredAttribute("tqe")
.validateRequiredAttribute("w");
}
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws MetadataException {
dataType = parameters.getDataType(0);
tqs = parameters.getLong("tqs"); // closed
tqe = parameters.getLong("tqe"); // open
w = parameters.getInt("w");
if ((tqe - tqs) % w != 0) {
throw new MetadataException("You should make tqe-tqs integer divide w");
}
configurations
.setAccessStrategy(new RowByRowAccessStrategy())
.setOutputDataType(TSDataType.TEXT);
init();
this.idx = -1;
result = new String[w];
for (int i = 0; i < w; i++) {
result[i] = "empty";
}
// System.out.println("====DEBUG====: use UDTFM4MAC for MAC");
}
@Override
public void transform(Row row, PointCollector collector)
throws QueryProcessException, IOException {
if (idx < 0) {
idx = 0; // means at least not all empty
}
switch (dataType) {
case INT32:
transformInt(row.getTime(), row.getInt(0));
break;
case INT64:
transformLong(row.getTime(), row.getLong(0));
break;
case FLOAT:
transformFloat(row.getTime(), row.getFloat(0));
break;
case DOUBLE:
transformDouble(row.getTime(), row.getDouble(0));
break;
default:
break;
}
}
protected void transformInt(long time, int value) throws IOException {
long intervalLen = (tqe - tqs) / w;
int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
if (pos >= w) {
throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
}
if (pos > idx) {
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ intFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ intLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ intMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ intMaxV
+ ")";
idx = pos;
init(); // clear environment for this new interval
}
// update for the current interval
if (time < minTime) {
minTime = time;
intFirstV = value;
}
if (time > maxTime) {
maxTime = time;
intLastV = value;
}
if (value < intMinV) {
bottomTime = time;
intMinV = value;
}
if (value > intMaxV) {
topTime = time;
intMaxV = value;
}
}
protected void transformLong(long time, long value) throws IOException {
long intervalLen = (tqe - tqs) / w;
int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
if (pos >= w) {
throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
}
if (pos > idx) {
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ longFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ longLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ longMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ longMaxV
+ ")";
idx = pos;
init(); // clear environment for this new interval
}
if (time < minTime) {
minTime = time;
longFirstV = value;
}
if (time > maxTime) {
maxTime = time;
longLastV = value;
}
if (value < longMinV) {
bottomTime = time;
longMinV = value;
}
if (value > longMaxV) {
topTime = time;
longMaxV = value;
}
}
protected void transformFloat(long time, float value) throws IOException {
long intervalLen = (tqe - tqs) / w;
int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
if (pos >= w) {
throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
}
if (pos > idx) {
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ floatFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ floatLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ floatMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ floatMaxV
+ ")";
idx = pos;
init(); // clear environment for this new interval
}
if (time < minTime) {
minTime = time;
floatFirstV = value;
}
if (time > maxTime) {
maxTime = time;
floatLastV = value;
}
if (value < floatMinV) {
bottomTime = time;
floatMinV = value;
}
if (value > floatMaxV) {
topTime = time;
floatMaxV = value;
}
}
protected void transformDouble(long time, double value) throws IOException {
long intervalLen = (tqe - tqs) / w;
int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
if (pos >= w) {
throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
}
if (pos > idx) {
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ doubleFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ doubleLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ doubleMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ doubleMaxV
+ ")";
idx = pos;
init(); // clear environment for this new interval
}
if (time < minTime) {
minTime = time;
doubleFirstV = value;
}
if (time > maxTime) {
maxTime = time;
doubleLastV = value;
}
if (value < doubleMinV) {
bottomTime = time;
doubleMinV = value;
}
if (value > doubleMaxV) {
topTime = time;
doubleMaxV = value;
}
}
@Override
public void terminate(PointCollector collector) throws IOException, QueryProcessException {
if (idx >= 0) { // means at least not all empty
// record the last interval (not necessarily idx=w-1) in the transform stage
switch (dataType) {
case INT32:
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ intFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ intLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ intMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ intMaxV
+ ")";
break;
case INT64:
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ longFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ longLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ longMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ longMaxV
+ ")";
break;
case FLOAT:
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ floatFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ floatLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ floatMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ floatMaxV
+ ")";
break;
case DOUBLE:
result[idx] =
"FirstPoint=("
+ minTime
+ ","
+ doubleFirstV
+ "), "
+ "LastPoint=("
+ maxTime
+ ","
+ doubleLastV
+ "), "
+ "BottomPoint=("
+ bottomTime
+ ","
+ doubleMinV
+ "), "
+ "TopPoint=("
+ topTime
+ ","
+ doubleMaxV
+ ")";
break;
default:
break;
}
}
// collect result
for (int i = 0; i < w; i++) {
long startInterval = tqs + (tqe - tqs) / w * i;
collector.putString(startInterval, result[i]);
}
}
}