Configuring CarbonData

This tutorial guides you through the advanced configurations of CarbonData :

System Configuration

This section provides the details of all the configurations required for the CarbonData System.

System Configuration in carbon.properties

PropertyDefault ValueDescription
carbon.storelocationLocation where CarbonData will create the store, and write the data in its own format. If not specified then it takes spark.sql.warehouse.dir path. NOTE: Store location should be in HDFS.
carbon.ddl.base.hdfs.urlThis property is used to configure the HDFS relative path, the path configured in carbon.ddl.base.hdfs.url will be appended to the HDFS path configured in fs.defaultFS. If this path is configured, then user need not pass the complete path while dataload. For example: If absolute path of the csv file is hdfs://10.18.101.155:54310/data/cnbc/2016/xyz.csv, the path “hdfs://10.18.101.155:54310” will come from property fs.defaultFS and user can configure the /data/cnbc/ as carbon.ddl.base.hdfs.url. Now while dataload user can specify the csv path as /2016/xyz.csv.
carbon.badRecords.locationPath where the bad records are stored.
carbon.data.file.versionV3If this parameter value is set to 1, then CarbonData will support the data load which is in old format(0.x version). If the value is set to 2(1.x onwards version), then CarbonData will support the data load of new format only.
carbon.streaming.auto.handoff.enabledtrueIf this parameter value is set to true, auto trigger handoff function will be enabled.
carbon.streaming.segment.max.size1024000000This parameter defines the maximum size of the streaming segment. Setting this parameter to appropriate value will avoid impacting the streaming ingestion. The value is in bytes.
carbon.query.show.datamapstrueIf this parameter value is set to true, show tables command will list all the tables including datatmaps(eg: Preaggregate table), else datamaps will be excluded from the table list.
carbon.segment.lock.files.preserve.hours48This property value indicates the number of hours the segment lock files will be preserved after dataload. These lock files will be deleted with the clean command after the configured number of hours.
carbon.unsafe.working.memory.in.mb512Specifies the size of executor unsafe working memory. Used for sorting data, storing column pages,etc. This value is expressed in MB.
carbon.unsafe.driver.working.memory.in.mb512Specifies the size of driver unsafe working memory. Used for storing block or blocklet datamap cache. If not configured then carbon.unsafe.working.memory.in.mb value is considered. This value is expressed in MB.

Performance Configuration

This section provides the details of all the configurations required for CarbonData Performance Optimization.

Performance Configuration in carbon.properties

  • Data Loading Configuration
ParameterDefault ValueDescriptionRange
carbon.number.of.cores.while.loading2Number of cores to be used while loading data.
carbon.sort.size100000Record count to sort and write intermediate files to temp.
carbon.max.driver.lru.cache.size-1Max LRU cache size upto which data will be loaded at the driver side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted.
carbon.max.executor.lru.cache.size-1Max LRU cache size upto which data will be loaded at the executor side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. If this parameter is not configured, then the carbon.max.driver.lru.cache.size value will be considered.
carbon.merge.sort.prefetchtrueEnable prefetch of data during merge sort while reading data from sort temp files in data loading.
carbon.insert.persist.enablefalseEnabling this parameter considers persistent data. If we are executing insert into query from source table using select statement & loading the same source table concurrently, when select happens on source table during the data load, it gets new record for which dictionary is not generated, so there will be inconsistency. To avoid this condition we can persist the dataframe into MEMORY_AND_DISK(default value) and perform insert into operation. By default this value will be false because no need to persist the dataframe in all cases. If user wants to run load and insert queries on source table concurrently then user can enable this parameter.
carbon.insert.storage.levelMEMORY_AND_DISKWhich storage level to persist dataframe when ‘carbon.insert.persist.enable’=true, if user's executor has less memory, set this parameter to ‘MEMORY_AND_DISK_SER’ or other storage level to correspond to different environment. See detail.
carbon.update.persist.enabletrueEnabling this parameter considers persistent data. Enabling this will reduce the execution time of UPDATE operation.
carbon.update.storage.levelMEMORY_AND_DISKWhich storage level to persist dataframe when ‘carbon.update.persist.enable’=true, if user's executor has less memory, set this parameter to ‘MEMORY_AND_DISK_SER’ or other storage level to correspond to different environment. See detail.
carbon.global.sort.rdd.storage.levelMEMORY_ONLYWhich storage level to persist rdd when loading data with ‘sort_scope’=‘global_sort’, if user's executor has less memory, set this parameter to ‘MEMORY_AND_DISK_SER’ or other storage level to correspond to different environment. See detail.
carbon.load.global.sort.partitions0The Number of partitions to use when shuffling data for sort. If user don't configurate or configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we recommend 2-3 tasks per CPU core in your cluster.
carbon.options.bad.records.logger.enablefalseWhether to create logs with details about bad records.
carbon.bad.records.actionFORCEThis property can have four types of actions for bad records FORCE, REDIRECT, IGNORE and FAIL. If set to FORCE then it auto-corrects the data by storing the bad records as NULL. If set to REDIRECT then bad records are written to the raw CSV instead of being loaded. If set to IGNORE then bad records are neither loaded nor written to the raw CSV. If set to FAIL then data loading fails if any bad records are found.
carbon.options.is.empty.data.bad.recordfalseIf false, then empty ("" or '' or ,,) data will not be considered as bad record and vice versa.
carbon.options.bad.record.pathSpecifies the HDFS path where bad records are stored. By default the value is Null. This path must to be configured by the user if bad record logger is enabled or bad record action redirect.
carbon.enable.vector.readertrueThis parameter increases the performance of select queries as it fetch columnar batch of size 4*1024 rows instead of fetching data row by row.
carbon.blockletgroup.size.in.mb64 MBThe data are read as a group of blocklets which are called blocklet groups. This parameter specifies the size of the blocklet group. Higher value results in better sequential IO access.The minimum value is 16MB, any value lesser than 16MB will reset to the default value (64MB).
carbon.task.distributionblockblock: Setting this value will launch one task per block. This setting is suggested in case of concurrent queries and queries having big shuffling scenarios. custom: Setting this value will group the blocks and distribute it uniformly to the available resources in the cluster. This enhances the query performance but not suggested in case of concurrent queries and queries having big shuffling scenarios. blocklet: Setting this value will launch one task per blocklet. This setting is suggested in case of concurrent queries and queries having big shuffling scenarios. merge_small_files: Setting this value will merge all the small partitions to a size of (128 MB is the default value of “spark.sql.files.maxPartitionBytes”,it is configurable) during querying. The small partitions are combined to a map task to reduce the number of read task. This enhances the performance.
carbon.load.sortmemory.spill.percentage0If we use unsafe memory during data loading, this configuration will be used to control the behavior of spilling inmemory pages to disk. Internally in Carbondata, during sorting carbondata will sort data in pages and add them in unsafe memory. If the memory is insufficient, carbondata will spill the pages to disk and generate sort temp file. This configuration controls how many pages in memory will be spilled to disk based size. The size can be calculated by multiplying this configuration value with ‘carbon.sort.storage.inmemory.size.inmb’. For example, default value 0 means that no pages in unsafe memory will be spilled and all the newly sorted data will be spilled to disk; Value 50 means that if the unsafe memory is insufficient, about half of pages in the unsafe memory will be spilled to disk while value 100 means that almost all pages in unsafe memory will be spilled. Note: This configuration only works for ‘LOCAL_SORT’ and ‘BATCH_SORT’ and the actual spilling behavior may slightly be different in each data loading.Integer values between 0 and 100
  • Compaction Configuration
ParameterDefault ValueDescriptionRange
carbon.number.of.cores.while.compacting2Number of cores which are used to write data during compaction.
carbon.compaction.level.threshold4, 3This property is for minor compaction which decides how many segments to be merged. Example: If it is set as 2, 3 then minor compaction will be triggered for every 2 segments. 3 is the number of level 1 compacted segment which is further compacted to new segment.Valid values are from 0-100.
carbon.major.compaction.size1024Major compaction size can be configured using this parameter. Sum of the segments which is below this threshold will be merged. This value is expressed in MB.
carbon.horizontal.compaction.enabletrueThis property is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delta (DELETE/ UPDATE) files becomes more than specified threshold.
carbon.horizontal.UPDATE.compaction.threshold1This property specifies the threshold limit on number of UPDATE delta files within a segment. In case the number of delta files goes beyond the threshold, the UPDATE delta files within the segment becomes eligible for horizontal compaction and compacted into single UPDATE delta file.Values between 1 to 10000.
carbon.horizontal.DELETE.compaction.threshold1This property specifies the threshold limit on number of DELETE delta files within a block of a segment. In case the number of delta files goes beyond the threshold, the DELETE delta files for the particular block of the segment becomes eligible for horizontal compaction and compacted into single DELETE delta file.Values between 1 to 10000.
carbon.update.segment.parallelism1This property specifies the parallelism for each segment during update. If there are segments that contain too many records to update and the spark job encounter data-spill related errors, it is better to increase this property value. It is recommended to set this value to a multiple of the number of executors for balance.Values between 1 to 1000.
carbon.merge.index.in.segmenttrueThis property is used to merge all carbon index files (.carbonindex) inside a segment to a single carbon index merge file (.carbonindexmerge).Values true or false
  • Query Configuration
ParameterDefault ValueDescriptionRange
carbon.number.of.cores4Number of cores to be used while querying.
carbon.enable.quick.filterfalseImproves the performance of filter query.

Miscellaneous Configuration

Extra Configuration in carbon.properties

  • Time format for CarbonData
ParameterDefault FormatDescription
carbon.timestamp.formatyyyy-MM-dd HH:mm:ssTimestamp format of input data used for timestamp data type.
  • Dataload Configuration
ParameterDefault ValueDescription
carbon.sort.file.write.buffer.size16384File write buffer size used during sorting. Minimum allowed buffer size is 10240 byte and Maximum allowed buffer size is 10485760 byte.
carbon.lock.typeLOCALLOCKThis configuration specifies the type of lock to be acquired during concurrent operations on table. There are following types of lock implementation: - LOCALLOCK: Lock is created on local file system as file. This lock is useful when only one spark driver (thrift server) runs on a machine and no other CarbonData spark application is launched concurrently. - HDFSLOCK: Lock is created on HDFS file system as file. This lock is useful when multiple CarbonData spark applications are launched and no ZooKeeper is running on cluster and HDFS supports file based locking.
carbon.lock.pathTABLEPATHLocks on the files are used to prevent concurrent operation from modifying the same files. This
configuration specifies the path where lock files have to be created. Recommended to configure
HDFS lock path(to this property) in case of S3 file system as locking is not feasible on S3.
Note: If this property is not set to HDFS location for S3 store, then there is a possibility
of data corruption because multiple data manipulation calls might try to update the status file
and as lock is not acquired before updation data might get overwritten.
carbon.sort.intermediate.files.limit20Minimum number of intermediate files after which merged sort can be started (minValue = 2, maxValue=50).
carbon.block.meta.size.reserved.percentage10Space reserved in percentage for writing block meta data in CarbonData file.
carbon.csv.read.buffersize.byte1048576csv reading buffer size.
carbon.merge.sort.reader.thread3Maximum no of threads used for reading intermediate files for final merging.
carbon.concurrent.lock.retries100Specifies the maximum number of retries to obtain the lock for concurrent operations. This is used for concurrent loading.
carbon.concurrent.lock.retry.timeout.sec1Specifies the interval between the retries to obtain the lock for concurrent operations.
carbon.lock.retries3Specifies the maximum number of retries to obtain the lock for any operations other than load.
carbon.lock.retry.timeout.sec5Specifies the interval between the retries to obtain the lock for any operation other than load.
carbon.skip.empty.linefalseSetting this property ignores the empty lines in the CSV file during the data load
carbon.enable.calculate.sizetrueFor Load Operation: Setting this property calculates the size of the carbon data file (.carbondata) and carbon index file (.carbonindex) for every load and updates the table status file. For Describe Formatted: Setting this property calculates the total size of the carbon data files and carbon index files for the respective table and displays in describe formatted command.
  • Compaction Configuration
ParameterDefault ValueDescription
carbon.numberof.preserve.segments0If the user wants to preserve some number of segments from being compacted then he can set this property. Example: carbon.numberof.preserve.segments = 2 then 2 latest segments will always be excluded from the compaction. No segments will be preserved by default.
carbon.allowed.compaction.days0Compaction will merge the segments which are loaded with in the specific number of days configured. Example: If the configuration is 2, then the segments which are loaded in the time frame of 2 days only will get merged. Segments which are loaded 2 days apart will not be merged. This is disabled by default.
carbon.enable.auto.load.mergefalseTo enable compaction while data loading.
carbon.enable.page.level.reader.in.compactiontrueEnabling page level reader for compaction reduces the memory usage while compacting more number of segments. It allows reading only page by page instead of reading whole blocklet to memory.
  • Query Configuration
ParameterDefault ValueDescription
max.query.execution.time60Maximum time allowed for one query to be executed. The value is in minutes.
carbon.enableMinMaxtrueMin max is feature added to enhance query performance. To disable this feature, set it false.
carbon.dynamicallocation.schedulertimeout5Specifies the maximum time (unit in seconds) the scheduler can wait for executor to be active. Minimum value is 5 sec and maximum value is 15 sec.
carbon.scheduler.minregisteredresourcesratio0.8Specifies the minimum resource (executor) ratio needed for starting the block distribution. The default value is 0.8, which indicates 80% of the requested resource is allocated for starting block distribution. The minimum value is 0.1 min and the maximum value is 1.0.
carbon.search.enabled (Alpha Feature)falseIf set to true, it will use CarbonReader to do distributed scan directly instead of using compute framework like spark, thus avoiding limitation of compute framework like SQL optimizer and task scheduling overhead.
  • Global Dictionary Configurations
ParameterDefault ValueDescription
carbon.cutOffTimestampSets the start date for calculating the timestamp. Java counts the number of milliseconds from start of “1970-01-01 00:00:00”. This property is used to customize the start of position. For example “2000-01-01 00:00:00”. The date must be in the form “carbon.timestamp.format”.
carbon.timegranularitySECONDThe property used to set the data granularity level DAY, HOUR, MINUTE, or SECOND.

Spark Configuration

Spark Configuration Reference in spark-defaults.conf

ParameterDefault ValueDescription
spark.driver.memory1gAmount of memory to be used by the driver process.
spark.executor.memory1gAmount of memory to be used per executor process.

Dynamic Configuration In CarbonData Using SET-RESET

SET/RESET commands are used to add, update, display, or reset the carbondata properties dynamically without restarting the driver.

Syntax

  • Add or Update : This command adds or updates the value of parameter_name.
SET parameter_name=parameter_value
  • Display Property Value: This command displays the value of the specified parameter_name.
SET parameter_name
  • Display Session Parameters: This command displays all the supported session parameters.
SET
  • Display Session Parameters along with usage details: This command displays all the supported session parameters along with their usage details.
SET -v
  • Reset: This command clears all the session parameters.
RESET

Parameter Description:

ParameterDescription
parameter_nameName of the property whose value needs to be dynamically added, updated, or displayed.
parameter_valueNew value of the parameter_name to be set.

Dynamically Configurable Properties of CarbonData

PropertiesDescription
carbon.options.bad.records.logger.enableTo enable or disable bad record logger.
carbon.options.bad.records.actionThis property can have four types of actions for bad records FORCE, REDIRECT, IGNORE and FAIL. If set to FORCE then it auto-corrects the data by storing the bad records as NULL. If set to REDIRECT then bad records are written to the raw CSV instead of being loaded. If set to IGNORE then bad records are neither loaded nor written to the raw CSV. If set to FAIL then data loading fails if any bad records are found.
carbon.options.is.empty.data.bad.recordIf false, then empty ("" or '' or ,,) data will not be considered as bad record and vice versa.
carbon.options.batch.sort.size.inmbSize of batch data to keep in memory, as a thumb rule it supposed to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk.
carbon.options.single.passSingle Pass Loading enables single job to finish data loading with dictionary generation on the fly. It enhances performance in the scenarios where the subsequent data loading after initial load involves fewer incremental updates on the dictionary. This option specifies whether to use single pass for loading data or not. By default this option is set to FALSE.
carbon.options.bad.record.pathSpecifies the HDFS path where bad records needs to be stored.
carbon.custom.block.distributionSpecifies whether to use the Spark or Carbon block distribution feature.
enable.unsafe.sortSpecifies whether to use unsafe sort during data loading. Unsafe sort reduces the garbage collection during data load operation, resulting in better performance.

Examples:

  • Add or Update:
SET enable.unsafe.sort =true
  • Display Property Value:
SET enable.unsafe.sort
  • Reset:
RESET

System Response:

  • Success will be recorded in the driver log.

  • Failure will be displayed in the UI.