blob: a52a016546ecfb6ddc42d59d9552a67081fb0bf5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
import alluxio.cli.fs.FileSystemShell
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.examples.util.ExampleUtils
/**
* configure alluxio:
* 1.start alluxio
* 2. Please upload data to alluxio if you set runShell as false
* ./bin/alluxio fs copyFromLocal /carbondata_path/hadoop/src/test/resources/data.csv /
* 3.Get more details at: https://www.alluxio.org/docs/1.8/en/compute/Spark.html
*/
object AlluxioExample {
def main (args: Array[String]) {
val carbon = ExampleUtils.createCarbonSession("AlluxioExample",
storePath = "alluxio://localhost:19998/carbondata")
val runShell: Boolean = if (null != args && args.length > 0) {
args(0).toBoolean
} else {
true
}
exampleBody(carbon, runShell)
carbon.close()
}
def exampleBody (spark: SparkSession, runShell: Boolean = true): Unit = {
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
spark.sparkContext.hadoopConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")
FileFactory.getConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")
// Specify date format based on raw data
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
val time = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
val alluxioPath = "alluxio://localhost:19998"
var alluxioFile = alluxioPath + "/data.csv"
val remoteFile = "/carbon_alluxio" + time + ".csv"
var mFsShell: FileSystemShell = null
// avoid dependency alluxio shell when running it with spark-submit
if (runShell) {
mFsShell = new FileSystemShell()
alluxioFile = alluxioPath + remoteFile
val localFile = rootPath + "/hadoop/src/test/resources/data.csv"
mFsShell.run("copyFromLocal", localFile, remoteFile)
}
import spark._
sql("DROP TABLE IF EXISTS alluxio_table")
sql(
s"""
| CREATE TABLE IF NOT EXISTS alluxio_table(
| ID Int,
| date Date,
| country String,
| name String,
| phonetype String,
| serialname String,
| salary Int)
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'SORT_COLUMNS' = 'phonetype,name',
| 'TABLE_BLOCKSIZE'='32',
| 'AUTO_LOAD_MERGE'='true')
""".stripMargin)
for (i <- 0 until 2) {
sql(
s"""
| LOAD DATA LOCAL INPATH '$alluxioFile'
| into table alluxio_table
""".stripMargin)
}
sql("SELECT * FROM alluxio_table").show()
sql("SHOW SEGMENTS FOR TABLE alluxio_table").show()
sql(
"""
| SELECT country, count(salary) AS amount
| FROM alluxio_table
| WHERE country IN ('china','france')
| GROUP BY country
""".stripMargin).show()
for (i <- 0 until 2) {
sql(
s"""
| LOAD DATA LOCAL INPATH '$alluxioFile'
| into table alluxio_table
""".stripMargin)
}
sql("SHOW SEGMENTS FOR TABLE alluxio_table").show()
if (runShell) {
mFsShell.run("rm", remoteFile)
mFsShell.close()
}
sql("DROP TABLE IF EXISTS alluxio_table")
}
}