blob: 616d607d94fe6eeb95d8a7dac3a9098e8fef2ebc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.sql.tests;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* End-to-end test for batch SQL queries.
*
* <p>The sources are generated and bounded. The result is always constant.
*
* <p>Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that
* will be executed as executeSql
*/
public class BatchSQLTestProgram {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
String sqlStatement = params.getRequired("sqlStatement");
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
((TableEnvironmentInternal) tEnv)
.registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
((TableEnvironmentInternal) tEnv)
.registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
((TableEnvironmentInternal) tEnv)
.registerTableSinkInternal(
"sinkTable",
new CsvTableSink(outputPath)
.configure(
new String[] {"f0", "f1"},
new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP}));
TableResult result = tEnv.executeSql(sqlStatement);
// wait job finish
result.getJobClient().get().getJobExecutionResult().get();
}
/** TableSource for generated data. */
public static class GeneratorTableSource extends InputFormatTableSource<Row> {
private final int numKeys;
private final float recordsPerKeyAndSecond;
private final int durationSeconds;
private final int offsetSeconds;
GeneratorTableSource(
int numKeys, float recordsPerKeyAndSecond, int durationSeconds, int offsetSeconds) {
this.numKeys = numKeys;
this.recordsPerKeyAndSecond = recordsPerKeyAndSecond;
this.durationSeconds = durationSeconds;
this.offsetSeconds = offsetSeconds;
}
@Override
public InputFormat<Row, ?> getInputFormat() {
return new IteratorInputFormat<>(
DataGenerator.create(
numKeys, recordsPerKeyAndSecond, durationSeconds, offsetSeconds));
}
@Override
public DataType getProducedDataType() {
return getTableSchema().toRowDataType();
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.field("key", DataTypes.INT())
.field("rowtime", DataTypes.TIMESTAMP(3))
.field("payload", DataTypes.STRING())
.build();
}
}
/** Iterator for generated data. */
public static class DataGenerator implements Iterator<Row>, Serializable {
private static final long serialVersionUID = 1L;
final int numKeys;
private int keyIndex = 0;
private final long durationMs;
private final long stepMs;
private final long offsetMs;
private long ms = 0;
static DataGenerator create(
int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) {
int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
return new DataGenerator(
numKeys, durationSeconds * 1000, sleepMs, offsetSeconds * 2000L);
}
DataGenerator(int numKeys, long durationMs, long stepMs, long offsetMs) {
this.numKeys = numKeys;
this.durationMs = durationMs;
this.stepMs = stepMs;
this.offsetMs = offsetMs;
}
@Override
public boolean hasNext() {
return ms < durationMs;
}
@Override
public Row next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Row row =
Row.of(
keyIndex,
LocalDateTime.ofInstant(
Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC),
"Some payload...");
++keyIndex;
if (keyIndex >= numKeys) {
keyIndex = 0;
ms += stepMs;
}
return row;
}
}
}