blob: 042aa92b97755373a58625277924b6d689701e60 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.udf.builtin;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
import org.apache.iotdb.udf.api.exception.UDFOutputSeriesDataTypeNotValidException;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.commons.jexl3.JexlBuilder;
import org.apache.commons.jexl3.JexlEngine;
import org.apache.commons.jexl3.JexlScript;
import java.io.IOException;
import java.util.HashMap;
public class UDTFJexl implements UDTF {
private int inputSeriesNumber;
private TSDataType[] inputDataType;
private TSDataType outputDataType;
private JexlScript script;
private Evaluator evaluator;
@Override
public void validate(UDFParameterValidator validator) throws UDFException {
inputSeriesNumber = validator.getParameters().getChildExpressionsSize();
for (int i = 0; i < inputSeriesNumber; i++) {
validator.validateInputSeriesDataType(
i, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE, Type.TEXT, Type.BOOLEAN);
}
validator.validateRequiredAttribute("expr");
}
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws UDFInputSeriesDataTypeNotValidException, UDFOutputSeriesDataTypeNotValidException,
MetadataException {
String expr = parameters.getString("expr");
JexlEngine jexl = new JexlBuilder().create();
script = jexl.createScript(expr);
inputDataType = new TSDataType[inputSeriesNumber];
for (int i = 0; i < inputSeriesNumber; i++) {
inputDataType[i] = UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(i));
}
outputDataType = probeOutputDataType();
if (inputSeriesNumber == 1) {
switch (inputDataType[0]) {
case INT32:
evaluator = new EvaluatorIntInput();
break;
case INT64:
evaluator = new EvaluatorLongInput();
break;
case FLOAT:
evaluator = new EvaluatorFloatInput();
break;
case DOUBLE:
evaluator = new EvaluatorDoubleInput();
break;
case TEXT:
evaluator = new EvaluatorStringInput();
break;
case BOOLEAN:
evaluator = new EvaluatorBooleanInput();
break;
default:
throw new UDFInputSeriesDataTypeNotValidException(
0,
UDFDataTypeTransformer.transformToUDFDataType(inputDataType[0]),
Type.INT32,
Type.INT64,
Type.FLOAT,
Type.DOUBLE,
Type.TEXT,
Type.BOOLEAN);
}
} else {
evaluator = new EvaluatorMulInput();
}
configurations
.setAccessStrategy(new RowByRowAccessStrategy())
.setOutputDataType(UDFDataTypeTransformer.transformToUDFDataType(outputDataType));
}
// 23, 23L, 23f, 23d, "string", true are hard codes for probing
private HashMap<TSDataType, Object> initialMap() {
HashMap<TSDataType, Object> map = new HashMap<>();
map.put(TSDataType.INT32, 23);
map.put(TSDataType.INT64, 23L);
map.put(TSDataType.FLOAT, 23f);
map.put(TSDataType.DOUBLE, 23d);
map.put(TSDataType.TEXT, "string");
map.put(TSDataType.BOOLEAN, true);
return map;
}
private TSDataType probeOutputDataType() throws UDFOutputSeriesDataTypeNotValidException {
// initial inputHardCodes to probe OutputDataType
HashMap<TSDataType, Object> map = initialMap();
Object[] inputHardCodes = new Object[inputSeriesNumber];
for (int i = 0; i < inputSeriesNumber; i++) {
inputHardCodes[i] = map.get(inputDataType[i]);
}
Object o = script.execute(null, inputHardCodes);
if (o instanceof Number) {
return TSDataType.DOUBLE;
} else if (o instanceof String) {
return TSDataType.TEXT;
} else if (o instanceof Boolean) {
return TSDataType.BOOLEAN;
} else {
throw new UDFOutputSeriesDataTypeNotValidException(0, "[Number, String, Boolean]");
}
}
@Override
public void transform(Row row, PointCollector collector)
throws IOException, UDFOutputSeriesDataTypeNotValidException,
UDFInputSeriesDataTypeNotValidException {
switch (outputDataType) {
case DOUBLE:
evaluator.evaluateDouble(row, collector);
break;
case TEXT:
evaluator.evaluateText(row, collector);
break;
case BOOLEAN:
evaluator.evaluateBoolean(row, collector);
break;
default:
// This will not happen.
throw new UDFOutputSeriesDataTypeNotValidException(0, "[Number, String, Boolean]");
}
}
private interface Evaluator {
void evaluateDouble(Row row, PointCollector collector)
throws IOException, UDFInputSeriesDataTypeNotValidException;
void evaluateText(Row row, PointCollector collector)
throws IOException, UDFInputSeriesDataTypeNotValidException;
void evaluateBoolean(Row row, PointCollector collector)
throws IOException, UDFInputSeriesDataTypeNotValidException;
}
private class EvaluatorIntInput implements Evaluator {
@Override
public void evaluateDouble(Row row, PointCollector collector) throws IOException {
collector.putDouble(
row.getTime(), ((Number) script.execute(null, row.getInt(0))).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector) throws IOException {
collector.putString(row.getTime(), (String) script.execute(null, row.getInt(0)));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector) throws IOException {
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, row.getInt(0)));
}
}
private class EvaluatorLongInput implements Evaluator {
@Override
public void evaluateDouble(Row row, PointCollector collector) throws IOException {
collector.putDouble(
row.getTime(), ((Number) script.execute(null, row.getLong(0))).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector) throws IOException {
collector.putString(row.getTime(), (String) script.execute(null, row.getLong(0)));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector) throws IOException {
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, row.getLong(0)));
}
}
private class EvaluatorFloatInput implements Evaluator {
@Override
public void evaluateDouble(Row row, PointCollector collector) throws IOException {
collector.putDouble(
row.getTime(), ((Number) script.execute(null, row.getFloat(0))).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector) throws IOException {
collector.putString(row.getTime(), (String) script.execute(null, row.getFloat(0)));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector) throws IOException {
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, row.getFloat(0)));
}
}
private class EvaluatorDoubleInput implements Evaluator {
@Override
public void evaluateDouble(Row row, PointCollector collector) throws IOException {
collector.putDouble(
row.getTime(), ((Number) script.execute(null, row.getDouble(0))).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector) throws IOException {
collector.putString(row.getTime(), (String) script.execute(null, row.getDouble(0)));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector) throws IOException {
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, row.getDouble(0)));
}
}
private class EvaluatorStringInput implements Evaluator {
@Override
public void evaluateDouble(Row row, PointCollector collector) throws IOException {
collector.putDouble(
row.getTime(), ((Number) script.execute(null, row.getString(0))).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector) throws IOException {
collector.putString(row.getTime(), (String) script.execute(null, row.getString(0)));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector) throws IOException {
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, row.getString(0)));
}
}
private class EvaluatorBooleanInput implements Evaluator {
@Override
public void evaluateDouble(Row row, PointCollector collector) throws IOException {
collector.putDouble(
row.getTime(), ((Number) script.execute(null, row.getBoolean(0))).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector) throws IOException {
collector.putString(row.getTime(), (String) script.execute(null, row.getBoolean(0)));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector) throws IOException {
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, row.getBoolean(0)));
}
}
private class EvaluatorMulInput implements Evaluator {
Object[] values = new Object[inputSeriesNumber];
@Override
public void evaluateDouble(Row row, PointCollector collector)
throws IOException, UDFInputSeriesDataTypeNotValidException {
getValues(row);
collector.putDouble(row.getTime(), ((Number) script.execute(null, values)).doubleValue());
}
@Override
public void evaluateText(Row row, PointCollector collector)
throws IOException, UDFInputSeriesDataTypeNotValidException {
getValues(row);
collector.putString(row.getTime(), (String) script.execute(null, values));
}
@Override
public void evaluateBoolean(Row row, PointCollector collector)
throws IOException, UDFInputSeriesDataTypeNotValidException {
getValues(row);
collector.putBoolean(row.getTime(), (Boolean) script.execute(null, values));
}
public void getValues(Row row) throws IOException, UDFInputSeriesDataTypeNotValidException {
for (int i = 0; i < inputSeriesNumber; i++) {
switch (inputDataType[i]) {
case INT32:
values[i] = row.getInt(i);
break;
case INT64:
values[i] = row.getLong(i);
break;
case FLOAT:
values[i] = row.getFloat(i);
break;
case DOUBLE:
values[i] = row.getDouble(i);
break;
case TEXT:
values[i] = row.getString(i);
break;
case BOOLEAN:
values[i] = row.getBoolean(i);
break;
default:
throw new UDFInputSeriesDataTypeNotValidException(
i,
UDFDataTypeTransformer.transformToUDFDataType(inputDataType[i]),
Type.INT32,
Type.INT64,
Type.FLOAT,
Type.DOUBLE,
Type.TEXT,
Type.BOOLEAN);
}
}
}
}
}