blob: c28952f08468e096548e7b1a22750cff037bcd70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.apps.tpch
import org.apache.wayang.apps.tpch.queries.{Query1, Query3Database, Query3File, Query3Hybrid}
import org.apache.wayang.apps.util.{Parameters, ProfileDBHelper, StdOut}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.Configuration
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate
import org.apache.wayang.postgres.Postgres
import org.apache.wayang.postgres.operators.PostgresTableSource
import org.apache.wayang.sqlite3.Sqlite3
import org.apache.wayang.sqlite3.operators.Sqlite3TableSource
/**
* This app adapts some TPC-H queries.
*/
object TpcH {
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
println(s"Usage: <main class> ${Parameters.experimentHelp} <plugin(,plugin)*> <TPC-H config URL> <query> [<query args>*]")
sys.exit(1)
}
val experimentArg = args(0)
val plugins = Parameters.loadPlugins(args(1))
val configUrl = args(2)
val queryName = args(3)
val jdbcPlatform = {
val jdbcPlatforms = plugins
.flatMap(
plugin => {
val list = plugin.getRequiredPlatforms
var array: List[Platform] = List[Platform]()
val iterator = list.iterator()
while(iterator.hasNext){
array :+= iterator.next()
}
array
}
)
.filter(_.isInstanceOf[JdbcPlatformTemplate])
.distinct
if (jdbcPlatforms.size == 1) jdbcPlatforms.head.asInstanceOf[JdbcPlatformTemplate]
else if (jdbcPlatforms.isEmpty) null
else throw new IllegalArgumentException(s"Detected multiple databases: ${jdbcPlatforms.mkString(", ")}.")
}
val createTableSource =
if (jdbcPlatform == null) {
(table: String, columns: Seq[String]) => throw new IllegalStateException("No database plugin detected.")
} else if (jdbcPlatform.equals(Sqlite3.platform)) {
(table: String, columns: Seq[String]) => new Sqlite3TableSource(table, columns: _*)
} else if (jdbcPlatform.equals(Postgres.platform)) {
(table: String, columns: Seq[String]) => new PostgresTableSource(table, columns: _*)
} else {
throw new IllegalArgumentException(s"Unsupported database: $jdbcPlatform.")
}
val configuration = new Configuration
configuration.load(configUrl)
var experiment: Experiment = null
queryName match {
case "Q1" =>
val query = new Query1(plugins: _*)
experiment = Parameters.createExperiment(experimentArg, query)
experiment.getSubject.addConfiguration("plugins", args(1))
experiment.getSubject.addConfiguration("query", args(3))
val result = query(configuration, jdbcPlatform, createTableSource)(experiment)
StdOut.printLimited(result, 10)
case "Q3File" =>
val query = new Query3File(plugins: _*)
experiment = Parameters.createExperiment(experimentArg, query)
experiment.getSubject.addConfiguration("plugins", args(1))
experiment.getSubject.addConfiguration("query", args(3))
val result = query(configuration)(experiment)
StdOut.printLimited(result, 10)
case "Q3" =>
val query = new Query3Database(plugins: _*)
experiment = Parameters.createExperiment(experimentArg, query)
experiment.getSubject.addConfiguration("plugins", args(1))
experiment.getSubject.addConfiguration("query", args(3))
val result = query(configuration, jdbcPlatform, createTableSource)(experiment)
StdOut.printLimited(result, 10)
case "Q3Hybrid" =>
val query = new Query3Hybrid(plugins: _*)
experiment = Parameters.createExperiment(experimentArg, query)
experiment.getSubject.addConfiguration("plugins", args(1))
experiment.getSubject.addConfiguration("query", args(3))
val result = query(configuration, jdbcPlatform, createTableSource)(experiment)
StdOut.printLimited(result, 10)
case other: String => {
println(s"Unknown query: $other")
sys.exit(1)
}
}
// Store experiment data.
ProfileDBHelper.store(experiment, configuration)
}
}