blob: 0c0d99b4a5e780048065a3d5d153997c58e4a997 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.transformation.dag.udf;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
import org.apache.iotdb.db.queryengine.transformation.datastructure.tv.ElasticSerializableTVList;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
public class UDTFExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(UDTFExecutor.class);
protected final String functionName;
protected final UDTFConfigurations configurations;
protected UDTF udtf;
protected ElasticSerializableTVList collector;
protected Object currentValue;
public UDTFExecutor(String functionName, ZoneId zoneId) {
this.functionName = functionName;
configurations = new UDTFConfigurations(zoneId);
}
public void beforeStart(
String queryId,
float collectorMemoryBudgetInMB,
List<String> childExpressions,
List<TSDataType> childExpressionDataTypes,
Map<String, String> attributes) {
reflectAndValidateUDF(childExpressions, childExpressionDataTypes, attributes);
configurations.check();
// Mappable UDF does not need PointCollector
if (!AccessStrategy.AccessStrategyType.MAPPABLE_ROW_BY_ROW.equals(
configurations.getAccessStrategy().getAccessStrategyType())) {
collector =
ElasticSerializableTVList.newElasticSerializableTVList(
UDFDataTypeTransformer.transformToTsDataType(configurations.getOutputDataType()),
queryId,
collectorMemoryBudgetInMB,
1);
}
}
private void reflectAndValidateUDF(
List<String> childExpressions,
List<TSDataType> childExpressionDataTypes,
Map<String, String> attributes) {
udtf = (UDTF) UDFManagementService.getInstance().reflect(functionName);
final UDFParameters parameters =
UDFParametersFactory.buildUdfParameters(
childExpressions, childExpressionDataTypes, attributes);
try {
udtf.validate(new UDFParameterValidator(parameters));
} catch (Exception e) {
onError("validate(UDFParameterValidator)", e);
}
try {
udtf.beforeStart(parameters, configurations);
} catch (Exception e) {
onError("beforeStart(UDFParameters, UDTFConfigurations)", e);
}
}
public void execute(Row row, boolean isCurrentRowNull) {
try {
if (isCurrentRowNull) {
// A null row will never trigger any UDF computing
collector.putNull(row.getTime());
} else {
udtf.transform(row, collector);
}
} catch (Exception e) {
onError("transform(Row, PointCollector)", e);
}
}
public void execute(Row row) {
try {
currentValue = udtf.transform(row);
} catch (Exception e) {
onError("transform(Row)", e);
}
}
public void execute(RowWindow rowWindow) {
try {
udtf.transform(rowWindow, collector);
} catch (Exception e) {
onError("transform(RowWindow, PointCollector)", e);
}
}
public void execute(Column[] columns, ColumnBuilder builder) {
try {
udtf.transform(columns, builder);
} catch (Exception e) {
onError("transform(TsBlock, ColumnBuilder)", e);
}
}
public Object getCurrentValue() {
return currentValue;
}
public void terminate() {
try {
udtf.terminate(collector);
} catch (Exception e) {
onError("terminate(PointCollector)", e);
}
}
public void beforeDestroy() {
if (udtf != null) {
udtf.beforeDestroy();
}
}
private void onError(String methodName, Exception e) {
LOGGER.warn(
"Error occurred during executing UDTF, please check whether the implementation of UDF is correct according to the udf-api description.",
e);
throw new RuntimeException(
String.format(
"Error occurred during executing UDTF#%s: %s, please check whether the implementation of UDF is correct according to the udf-api description.",
methodName, System.lineSeparator())
+ e);
}
public UDTFConfigurations getConfigurations() {
return configurations;
}
public ElasticSerializableTVList getCollector() {
return collector;
}
}