blob: 2ea83cacfc03452f44c2f8b7658d9ec261ff8523 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.test
import java.io.{File, FilenameFilter}
import java.util.ServiceLoader
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.util.Utils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
/**
* the sql executor of spark-common-test
*/
trait TestQueryExecutorRegister {
def sql(sqlText: String): DataFrame
def stop()
def sqlContext: SQLContext
}
object TestQueryExecutor {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val (projectPath, isIntegrationModule, localTarget) = {
val path = new File(this.getClass.getResource("/").getPath)
.getCanonicalPath.replaceAll("\\\\", "/")
// Check whether it is integration module
val isIntegrationModule = path.indexOf("/integration/") > -1
// Get the local target folder path
val targetPath = path.substring(0, path.lastIndexOf("/target/") + 8)
// Get the relative project path
val projectPathLocal = if (isIntegrationModule) {
path.substring(0, path.indexOf("/integration/"))
} else if (path.indexOf("/datamap/") > -1) {
path.substring(0, path.indexOf("/datamap/"))
} else if (path.indexOf("/tools/") > -1) {
path.substring(0, path.indexOf("/tools/"))
} else if (path.indexOf("/examples/") > -1) {
path.substring(0, path.indexOf("/examples/"))
} else {
path
}
(projectPathLocal, isIntegrationModule, targetPath)
}
LOGGER.info(s"project path: $projectPath")
val integrationPath = s"$projectPath/integration"
val target = if (isIntegrationModule) {
// If integration module , always point to spark-common/target location
s"$integrationPath/spark-common/target"
} else {
// Otherwise point to respective target folder location
localTarget
}
val metaStoreDB = target
val location = s"$target/dbpath"
val masterUrl = {
val property = System.getProperty("spark.master.url")
if (property == null) {
"local[2]"
} else {
property
}
}
val hdfsUrl = {
val property = System.getProperty("hdfs.url")
if (property == null) {
"local"
} else {
LOGGER.info("HDFS PATH given : " + property)
property
}
}
val resourcesPath = if (hdfsUrl.startsWith("hdfs://")) {
hdfsUrl
} else {
s"$integrationPath/spark-common-test/src/test/resources"
}
val storeLocation = if (hdfsUrl.startsWith("hdfs://")) {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
val carbonFile = FileFactory.getCarbonFile(s"$hdfsUrl/store")
FileFactory.deleteAllCarbonFilesOfDir(carbonFile)
s"$hdfsUrl/store_" + System.nanoTime()
} else {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL)
s"$target/store"
}
val warehouse = if (hdfsUrl.startsWith("hdfs://")) {
val carbonFile = FileFactory.getCarbonFile(s"$hdfsUrl/warehouse")
FileFactory.deleteAllCarbonFilesOfDir(carbonFile)
s"$hdfsUrl/warehouse_" + System.nanoTime()
} else {
s"$target/warehouse"
}
val badStoreLocation = if (hdfsUrl.startsWith("hdfs://")) {
s"$hdfsUrl/bad_store_" + System.nanoTime()
} else {
s"$target/bad_store"
}
val systemFolderPath = if (hdfsUrl.startsWith("hdfs://")) {
s"$hdfsUrl/systemfolder" + System.nanoTime()
} else {
s"$target/systemfolder"
}
createDirectory(badStoreLocation)
val hiveresultpath = if (hdfsUrl.startsWith("hdfs://")) {
val p = s"$hdfsUrl/hiveresultpath"
FileFactory.mkdirs(p)
p
} else {
val p = s"$target/hiveresultpath"
new File(p).mkdirs()
p
}
LOGGER.info(s"""Store path taken $storeLocation""")
LOGGER.info(s"""Warehouse path taken $warehouse""")
LOGGER.info(s"""Resource path taken $resourcesPath""")
lazy val modules = Seq(TestQueryExecutor.projectPath + "/common/target",
TestQueryExecutor.projectPath + "/core/target",
TestQueryExecutor.projectPath + "/hadoop/target",
TestQueryExecutor.projectPath + "/processing/target",
TestQueryExecutor.projectPath + "/integration/spark-datasource/target",
TestQueryExecutor.projectPath + "/integration/spark-common/target",
TestQueryExecutor.projectPath + "/integration/spark2/target",
TestQueryExecutor.projectPath + "/integration/spark-common/target/jars",
TestQueryExecutor.projectPath + "/streaming/target",
TestQueryExecutor.projectPath + "/store/sdk/target")
lazy val jars = {
val jarsLocal = new ArrayBuffer[String]()
modules.foreach { path =>
val files = new File(path).listFiles(new FilenameFilter {
override def accept(dir: File, name: String) = {
name.endsWith(".jar")
}
})
files.foreach(jarsLocal += _.getAbsolutePath)
}
jarsLocal
}
lazy val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, badStoreLocation)
.addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "1024")
.addProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE, "1024")
.addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, systemFolderPath)
private def lookupQueryExecutor: Class[_] = {
import scala.collection.JavaConverters._
ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)
.asScala
.filter(instance => instance
.getClass
.getName.equals("org.apache.spark.sql.test.Spark2TestQueryExecutor"))
.head.getClass
}
private def createDirectory(badStoreLocation: String) = {
FileFactory.mkdirs(badStoreLocation)
}
}