blob: 247a62566ba62cb0337af0947ca32c57d4f05625 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.tpcds;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.table.tpcds.schema.TpcdsSchema;
import org.apache.flink.table.tpcds.schema.TpcdsSchemaProvider;
import org.apache.flink.table.tpcds.stats.TpcdsStatsProvider;
import org.apache.flink.table.types.utils.TypeConversions;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
/** End-to-end test for TPC-DS. */
public class TpcdsTestProgram {
private static final List<String> TPCDS_TABLES =
Arrays.asList(
"catalog_sales",
"catalog_returns",
"inventory",
"store_sales",
"store_returns",
"web_sales",
"web_returns",
"call_center",
"catalog_page",
"customer",
"customer_address",
"customer_demographics",
"date_dim",
"household_demographics",
"income_band",
"item",
"promotion",
"reason",
"ship_mode",
"store",
"time_dim",
"warehouse",
"web_page",
"web_site");
private static final List<String> TPCDS_QUERIES =
Arrays.asList(
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14a",
"14b", "15", "16", "17", "18", "19", "20", "21", "22", "23a", "23b", "24a",
"24b", "25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36",
"37", "38", "39a", "39b", "40", "41", "42", "43", "44", "45", "46", "47", "48",
"49", "50", "51", "52", "53", "54", "55", "56", "57", "58", "59", "60", "61",
"62", "63", "64", "65", "66", "67", "68", "69", "70", "71", "72", "73", "74",
"75", "76", "77", "78", "79", "80", "81", "82", "83", "84", "85", "86", "87",
"88", "89", "90", "91", "92", "93", "94", "95", "96", "97", "98", "99");
private static final String QUERY_PREFIX = "query";
private static final String QUERY_SUFFIX = ".sql";
private static final String DATA_SUFFIX = ".dat";
private static final String RESULT_SUFFIX = ".ans";
private static final String COL_DELIMITER = "|";
private static final String FILE_SEPARATOR = "/";
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String sourceTablePath = params.getRequired("sourceTablePath");
String queryPath = params.getRequired("queryPath");
String sinkTablePath = params.getRequired("sinkTablePath");
Boolean useTableStats = params.getBoolean("useTableStats");
TableEnvironment tableEnvironment = prepareTableEnv(sourceTablePath, useTableStats);
// execute TPC-DS queries
for (String queryId : TPCDS_QUERIES) {
System.out.println("[INFO]Run TPC-DS query " + queryId + " ...");
String queryName = QUERY_PREFIX + queryId + QUERY_SUFFIX;
String queryFilePath = queryPath + FILE_SEPARATOR + queryName;
String queryString = loadFile2String(queryFilePath);
Table resultTable = tableEnvironment.sqlQuery(queryString);
// register sink table
String sinkTableName = QUERY_PREFIX + queryId + "_sinkTable";
((TableEnvironmentInternal) tableEnvironment)
.registerTableSinkInternal(
sinkTableName,
new CsvTableSink(
sinkTablePath + FILE_SEPARATOR + queryId + RESULT_SUFFIX,
COL_DELIMITER,
1,
FileSystem.WriteMode.OVERWRITE,
resultTable.getSchema().getFieldNames(),
resultTable.getSchema().getFieldDataTypes()));
TableResult tableResult = resultTable.executeInsert(sinkTableName);
// wait job finish
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println("[INFO]Run TPC-DS query " + queryId + " success.");
}
}
/**
* Prepare TableEnvironment for query.
*
* @param sourceTablePath
* @return
*/
private static TableEnvironment prepareTableEnv(String sourceTablePath, Boolean useTableStats) {
// init Table Env
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
// config Optimizer parameters
tEnv.getConfig()
.getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
tEnv.getConfig()
.getConfiguration()
.setString(
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
tEnv.getConfig()
.getConfiguration()
.setLong(
OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
10 * 1024 * 1024);
tEnv.getConfig()
.getConfiguration()
.setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
// register TPC-DS tables
TPCDS_TABLES.forEach(
table -> {
TpcdsSchema schema = TpcdsSchemaProvider.getTableSchema(table);
CsvTableSource.Builder builder = CsvTableSource.builder();
builder.path(sourceTablePath + FILE_SEPARATOR + table + DATA_SUFFIX);
for (int i = 0; i < schema.getFieldNames().size(); i++) {
builder.field(
schema.getFieldNames().get(i),
TypeConversions.fromDataTypeToLegacyInfo(
schema.getFieldTypes().get(i)));
}
builder.fieldDelimiter(COL_DELIMITER);
builder.emptyColumnAsNull();
builder.lineDelimiter("\n");
CsvTableSource tableSource = builder.build();
ConnectorCatalogTable catalogTable =
ConnectorCatalogTable.source(tableSource, true);
tEnv.getCatalog(tEnv.getCurrentCatalog())
.ifPresent(
catalog -> {
try {
catalog.createTable(
new ObjectPath(
tEnv.getCurrentDatabase(), table),
catalogTable,
false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
});
// register statistics info
if (useTableStats) {
TpcdsStatsProvider.registerTpcdsStats(tEnv);
}
return tEnv;
}
private static String loadFile2String(String filePath) throws Exception {
StringBuilder stringBuilder = new StringBuilder();
Stream<String> stream = Files.lines(Paths.get(filePath), StandardCharsets.UTF_8);
stream.forEach(s -> stringBuilder.append(s).append('\n'));
return stringBuilder.toString();
}
}