blob: 7aec40f0839e7fda4298ef566b3f7deab8393b9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.udf.builtin.String;
import org.apache.iotdb.udf.api.UDTF;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.customizer.strategy.MappableRowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BytesUtils;
import java.io.IOException;
/*This function Returns the concat string by input series and targets.
startsEnd: Indicates whether series behind targets. The default value is false.*/
public class UDTFConcat implements UDTF {
private boolean seriesBehind;
private final StringBuilder concatTargets = new StringBuilder();
@Override
public void validate(UDFParameterValidator validator) throws Exception {
int size = validator.getParameters().getChildExpressions().size();
for (int i = 0; i < size; i++) {
validator.validateInputSeriesDataType(i, Type.TEXT);
}
}
@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
parameters
.getAttributes()
.forEach(
(key, value) -> {
if (key.startsWith("target") && value != null) concatTargets.append(value);
});
seriesBehind = parameters.getBooleanOrDefault("series_behind", false);
configurations
.setAccessStrategy(new MappableRowByRowAccessStrategy())
.setOutputDataType(Type.TEXT);
}
@Override
public void transform(Row row, PointCollector collector) throws Exception {
StringBuilder concatSeries = new StringBuilder();
for (int i = 0; i < row.size(); i++) {
if (row.isNull(i)) {
continue;
}
concatSeries.append(row.getString(i));
}
collector.putString(
row.getTime(),
seriesBehind
? concatSeries.insert(0, concatTargets).toString()
: concatSeries.append(concatTargets).toString());
}
@Override
public Object transform(Row row) throws IOException {
StringBuilder concatSeries = new StringBuilder();
for (int i = 0; i < row.size(); i++) {
if (row.isNull(i)) {
continue;
}
concatSeries.append(row.getString(i));
}
String res =
seriesBehind
? concatSeries.insert(0, concatTargets).toString()
: concatSeries.append(concatTargets).toString();
return BytesUtils.valueOf(res);
}
@Override
public void transform(Column[] columns, ColumnBuilder builder) throws Exception {
int colCount = columns.length;
int rowCount = columns[0].getPositionCount();
Binary[][] inputFrame = new Binary[colCount][rowCount];
for (int i = 0; i < colCount - 1; i++) {
inputFrame[i] = columns[i].getBinaries();
}
boolean[][] isNullFrame = new boolean[colCount][rowCount];
for (int i = 0; i < colCount - 1; i++) {
isNullFrame[i] = columns[i].isNull();
}
for (int row = 0; row < rowCount; row++) {
StringBuilder concatSeries = new StringBuilder();
for (int col = 0; col < colCount - 1; col++) {
if (isNullFrame[col][row]) {
continue;
}
String str = inputFrame[col][row].getStringValue(TSFileConfig.STRING_CHARSET);
concatSeries.append(str);
}
String res =
seriesBehind
? concatSeries.insert(0, concatTargets).toString()
: concatSeries.append(concatTargets).toString();
builder.writeBinary(BytesUtils.valueOf(res));
}
}
}