| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.tpcds; |
| |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.PipelineResult; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.Row; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; |
| import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; |
| import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; |
| import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; |
| import java.util.List; |
| import java.util.concurrent.CompletionService; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| |
| |
| /** |
| * To execute this main() method, run the following example command from the command line. |
| * |
| * ./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ |
| * --queries=3,26,55 \ |
| * --tpcParallel=2 \ |
| * --project=apache-beam-testing \ |
| * --stagingLocation=gs://beamsql_tpcds_1/staging \ |
| * --tempLocation=gs://beamsql_tpcds_2/temp \ |
| * --runner=DataflowRunner \ |
| * --region=us-west1 \ |
| * --maxNumWorkers=10" |
| */ |
| public class BeamTpcds { |
| private static final String dataDirectory = "gs://beamsql_tpcds_1/data"; |
| private static final String resultDirectory = "gs://beamsql_tpcds_1/tpcds_results"; |
| |
| private static String buildTableCreateStatement(String tableName) { |
| String createStatement = "CREATE EXTERNAL TABLE " + tableName + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'"; |
| return createStatement; |
| } |
| |
| private static String buildDataLocation(String dataSize, String tableName) { |
| String dataLocation = dataDirectory + "/" + dataSize + "/" + tableName + ".dat"; |
| return dataLocation; |
| } |
| |
| /** Register all tables into env, set their schemas, and set the locations where their corresponding data are stored. */ |
| private static void registerAllTables(BeamSqlEnv env, String dataSize) throws Exception { |
| List<String> tableNames = TableSchemaJSONLoader.getAllTableNames(); |
| for (String tableName : tableNames) { |
| String createStatement = buildTableCreateStatement(tableName); |
| String tableSchema = TableSchemaJSONLoader.parseTableSchema(tableName); |
| String dataLocation = buildDataLocation(dataSize, tableName); |
| env.executeDdl(String.format(createStatement, tableSchema, dataLocation)); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore(); |
| inMemoryMetaStore.registerProvider(new TextTableProvider()); |
| |
| TpcdsOptions tpcdsOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class); |
| |
| String dataSize = TpcdsParametersReader.getAndCheckDataSize(tpcdsOptions); |
| String[] queryNameArr = TpcdsParametersReader.getAndCheckQueryNameArray(tpcdsOptions); |
| int nThreads = TpcdsParametersReader.getAndCheckTpcParallel(tpcdsOptions); |
| |
| // Using ExecutorService and CompletionService to fulfill multi-threading functionality |
| ExecutorService executor = Executors.newFixedThreadPool(nThreads); |
| CompletionService<PipelineResult> completion = new ExecutorCompletionService<>(executor); |
| |
| BeamSqlEnv env = |
| BeamSqlEnv |
| .builder(inMemoryMetaStore) |
| .setPipelineOptions(tpcdsOptions) |
| .build(); |
| |
| registerAllTables(env, dataSize); |
| |
| // Make an array of pipelines, each pipeline is responsible for running a corresponding query. |
| Pipeline[] pipelines = new Pipeline[queryNameArr.length]; |
| |
| // Execute all queries, transform the each result into a PCollection<String>, write them into the txt file and store in a GCP directory. |
| for (int i = 0; i < queryNameArr.length; i++) { |
| // For each query, get a copy of pipelineOptions from command line arguments, cast tpcdsOptions as a DataflowPipelineOptions object to read and set required parameters for pipeline execution. |
| TpcdsOptions tpcdsOptionsCopy = PipelineOptionsFactory.fromArgs(args).withValidation().as(TpcdsOptions.class); |
| DataflowPipelineOptions dataflowPipelineOptionsCopy = tpcdsOptionsCopy.as(DataflowPipelineOptions.class); |
| |
| // Set a unique job name using the time stamp so that multiple different pipelines can run together. |
| dataflowPipelineOptionsCopy.setJobName(queryNameArr[i] + "result" + System.currentTimeMillis()); |
| |
| pipelines[i] = Pipeline.create(dataflowPipelineOptionsCopy); |
| String queryString = QueryReader.readQuery(queryNameArr[i]); |
| |
| // Query execution |
| PCollection<Row> rows = BeamSqlRelUtils.toPCollection(pipelines[i], env.parseQuery(queryString)); |
| |
| // Transform the result from PCollection<Row> into PCollection<String>, and write it to the location where results are stored. |
| PCollection<String> rowStrings = rows.apply(MapElements |
| .into(TypeDescriptors.strings()) |
| .via((Row row) -> row.toString())); |
| rowStrings.apply(TextIO.write().to(resultDirectory + "/" + dataSize + "/" + pipelines[i].getOptions().getJobName()).withSuffix(".txt").withNumShards(1)); |
| |
| completion.submit(new TpcdsRun(pipelines[i])); |
| } |
| |
| executor.shutdown(); |
| } |
| } |