blob: 087bef22d0980b6e0c4ff4ec777876123fcbc4a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark
import scala.collection.mutable
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser
class StreamingOption(val userInputMap: Map[String, String]) {
lazy val trigger: Trigger = {
val trigger = userInputMap.getOrElse(
"trigger", throw new MalformedCarbonCommandException("trigger must be specified"))
val interval = userInputMap.getOrElse(
"interval", throw new MalformedCarbonCommandException("interval must be specified"))
trigger match {
case "ProcessingTime" => ProcessingTime(interval)
case others => throw new MalformedCarbonCommandException("invalid trigger: " + trigger)
}
}
def checkpointLocation(tablePath: String): String =
userInputMap.getOrElse(
"checkpointLocation",
CarbonTablePath.getStreamingCheckpointDir(tablePath))
lazy val timeStampFormat: String =
userInputMap.getOrElse("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
lazy val dateFormat: String =
userInputMap.getOrElse("dateformat", CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
lazy val rowParser: String =
userInputMap.getOrElse(CarbonStreamParser.CARBON_STREAM_PARSER,
CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
lazy val badRecordsPath: String =
userInputMap
.getOrElse("bad_record_path", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
lazy val badRecordsAction: String =
userInputMap
.getOrElse("bad_records_action", CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
lazy val badRecordsLogger: String =
userInputMap
.getOrElse("bad_records_logger_enable", CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))
lazy val isEmptyBadRecord: String =
userInputMap
.getOrElse("is_empty_bad_record", CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))
lazy val remainingOption: Map[String, String] = {
// copy the user input map and remove the fix options
val mutableMap = mutable.Map[String, String]() ++= userInputMap
mutableMap.remove("checkpointLocation")
mutableMap.remove("timestampformat")
mutableMap.remove("dateformat")
mutableMap.remove("trigger")
mutableMap.remove("interval")
mutableMap.remove(CarbonStreamParser.CARBON_STREAM_PARSER)
mutableMap.toMap
}
}