blob: 34eca3b2cdbf1ac2c6d4edd22a06c793de28e915 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
import org.apache.carbondata.spark.util.CarbonSparkUtil
/**
* Generate data and write data to S3
* User can generate different numbers of data by specifying the number-of-rows in parameters
*/
object S3UsingSdkExample {
// prepare SDK writer output
def buildTestData(
args: Array[String],
path: String,
num: Int = 3): Any = {
// getCanonicalPath gives path with \, but the code expects /.
val writerPath = path.replace("\\", "/")
val fields: Array[Field] = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("age", DataTypes.INT)
fields(2) = new Field("height", DataTypes.DOUBLE)
try {
val builder = CarbonWriter.builder()
val writer =
builder.outputPath(writerPath)
.uniqueIdentifier(System.currentTimeMillis)
.withBlockSize(2)
.writtenBy("S3UsingSdkExample")
.withHadoopConf(ACCESS_KEY, args(0))
.withHadoopConf(SECRET_KEY, args(1))
.withHadoopConf(ENDPOINT, CarbonSparkUtil.getS3EndPoint(args))
.withCsvInput(new Schema(fields)).build()
var i = 0
val row = num
while (i < row) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
i += 1
}
writer.close()
} catch {
case e: Exception => throw e
}
}
/**
* This example demonstrate usage of
* 1. create carbon table with storage location on object based storage
* like AWS S3, Huawei OBS, etc
* 2. load data into carbon table, the generated file will be stored on object based storage
* query the table.
*
* @param args require three parameters "Access-key" "Secret-key"
* "table-path on s3" "s3-endpoint" "spark-master"
*/
def main(args: Array[String]) {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
import org.apache.spark.sql.CarbonUtils._
if (args.length < 2 || args.length > 6) {
logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
"[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
System.exit(0)
}
val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(2))
val spark = SparkSession
.builder()
.master(getSparkMaster(args))
.appName("S3UsingSDKExample")
.config("spark.driver.host", "localhost")
.config(accessKey, args(0))
.config(secretKey, args(1))
.config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
.config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
.getOrCreate()
CarbonEnv.getInstance(spark)
spark.sparkContext.setLogLevel("WARN")
val path = if (args.length < 3) {
"s3a://sdk/WriterOutput2 "
} else {
args(2)
}
val num = if (args.length > 4) {
Integer.parseInt(args(4))
} else {
3
}
buildTestData(args, path, num)
spark.sql("DROP TABLE IF EXISTS s3_sdk_table")
spark.sql(s"CREATE EXTERNAL TABLE s3_sdk_table STORED BY 'carbondata'" +
s" LOCATION '$path'")
spark.sql("SELECT * FROM s3_sdk_table LIMIT 10").show()
spark.stop()
}
def getSparkMaster(args: Array[String]): String = {
if (args.length == 6) args(5)
else "local"
}
}