blob: d8a49e7e7769c2f4889ee6e3045f13c2a5628097 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.function;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.exception.IllegalSchemaException;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Arrays;
import java.util.List;
public class IoTDBBoundedScanFunction extends RichInputFormat<RowData, InputSplit> {
private final ReadableConfig options;
private final String sql;
private final List<Tuple2<String, DataType>> tableSchema;
private final long lowerBound;
private final long upperBound;
private Session session;
private SessionDataSet dataSet;
private List<String> columnNames;
public IoTDBBoundedScanFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
this.options = options;
tableSchema = schemaWrapper.getSchema();
sql = options.get(Options.SQL);
lowerBound = options.get(Options.SCAN_BOUNDED_LOWER_BOUND);
upperBound = options.get(Options.SCAN_BOUNDED_UPPER_BOUND);
}
@Override
public void configure(Configuration configuration) {
// fo nothing
}
@Override
public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
return baseStatistics;
}
@Override
public InputSplit[] createInputSplits(int i) {
return new GenericInputSplit[] {new GenericInputSplit(1, 1)};
}
@Override
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}
@Override
public void openInputFormat() {
session =
new Session.Builder()
.nodeUrls(Arrays.asList(options.get(Options.NODE_URLS).split(",")))
.username(options.get(Options.USER))
.password(options.get(Options.PASSWORD))
.build();
try {
session.open(false);
} catch (IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public void open(InputSplit inputSplit) {
String sql;
if (lowerBound < 0L && upperBound < 0L) {
sql = this.sql;
} else if (lowerBound < 0L && upperBound > 0L) {
sql = String.format("%s WHERE TIME <= %d", this.sql, upperBound);
} else if (lowerBound > 0L && upperBound < 0L) {
sql = String.format("%s WHERE TIME >= %d", this.sql, lowerBound);
} else {
sql = String.format("%s WHERE TIME >= %d AND TIME <= %d", this.sql, lowerBound, upperBound);
}
try {
dataSet = session.executeQueryStatement(sql);
columnNames = dataSet.getColumnNames();
for (Tuple2<String, DataType> field : tableSchema) {
if (!columnNames.contains(field.f0)) {
continue;
}
int index = columnNames.indexOf(field.f0);
TSDataType iotdbType = TSDataType.valueOf(dataSet.getColumnTypes().get(index));
DataType flinkType = field.f1;
if (!Utils.isTypeEqual(iotdbType, flinkType)) {
throw new IllegalSchemaException(
String.format(
"The data type of column `%s` is different in IoTDB and Flink", field.f0));
}
}
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean reachedEnd() {
try {
return !dataSet.hasNext();
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public RowData nextRecord(RowData rowData) {
try {
RowRecord rowRecord = dataSet.next();
return Utils.convert(rowRecord, columnNames, tableSchema);
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {
if (dataSet != null) {
dataSet.close();
}
if (session != null) {
session.close();
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new RuntimeException(e);
}
}
}