CarbonData DML statements are documented here,which includes:
This command is used to load csv files to carbondata, OPTIONS are not mandatory for data loading process.
LOAD DATA [LOCAL] INPATH 'folder_path' INTO TABLE [db_name.]table_name OPTIONS(property_name=property_value, ...)
Supported Properties:
Property | Description |
---|---|
DELIMITER | Character used to separate the data in the input csv file |
QUOTECHAR | Character used to quote the data in the input csv file |
COMMENTCHAR | Character used to comment the rows in the input csv file. Those rows will be skipped from processing |
HEADER | Whether the input csv files have header row |
FILEHEADER | If header is not present in the input csv, what is the column names to be used for data read from input csv |
MULTILINE | Whether a row data can span across multiple lines. |
ESCAPECHAR | Escape character used to excape the data in input csv file.For eg.,\ is a standard escape character |
SKIP_EMPTY_LINE | Whether empty lines in input csv file should be skipped or loaded as null row |
COMPLEX_DELIMITER_LEVEL_1 | Starting delimiter for complex type data in input csv file |
COMPLEX_DELIMITER_LEVEL_2 | Ending delimiter for complex type data in input csv file |
ALL_DICTIONARY_PATH | Path to read the dictionary data from all columns |
COLUMNDICT | Path to read the dictionary data from for particular column |
DATEFORMAT | Format of date in the input csv file |
TIMESTAMPFORMAT | Format of timestamp in the input csv file |
SORT_COLUMN_BOUNDS | How to parititon the sort columns to make the evenly distributed |
SINGLE_PASS | When to enable single pass data loading |
BAD_RECORDS_LOGGER_ENABLE | Whether to enable bad records logging |
BAD_RECORD_PATH | Bad records logging path. Useful when bad record logging is enabled |
BAD_RECORDS_ACTION | Behavior of data loading when bad record is found |
IS_EMPTY_DATA_BAD_RECORD | Whether empty data of a column to be considered as bad record or not |
GLOBAL_SORT_PARTITIONS | Number of partition to use for shuffling of data during sorting |
You can use the following options to load data:
Delimiters can be provided in the load command.
OPTIONS('DELIMITER'=',')
Quote Characters can be provided in the load command.
OPTIONS('QUOTECHAR'='"')
Comment Characters can be provided in the load command if user want to comment lines.
OPTIONS('COMMENTCHAR'='#')
When you load the CSV file without the file header and the file header is the same with the table schema, then add ‘HEADER’=‘false’ to load data SQL as user need not provide the file header. By default the value is ‘true’. false: CSV file is without file header. true: CSV file is with file header.
OPTIONS('HEADER'='false')
NOTE: If the HEADER option exist and is set to ‘true’, then the FILEHEADER option is not required.
Headers can be provided in the LOAD DATA command if headers are missing in the source files.
OPTIONS('FILEHEADER'='column1,column2')
CSV with new line character in quotes.
OPTIONS('MULTILINE'='true')
Escape char can be provided if user want strict validation of escape character in CSV files.
OPTIONS('ESCAPECHAR'='\')
This option will ignore the empty line in the CSV file during the data load.
OPTIONS('SKIP_EMPTY_LINE'='TRUE/FALSE')
Split the complex type data column in a row (eg., a$b$c --> Array = {a,b,c}).
OPTIONS('COMPLEX_DELIMITER_LEVEL_1'='$')
Split the complex type nested data column in a row. Applies level_1 delimiter & applies level_2 based on complex data type (eg., a:b$c:d --> Array> = {{a,b},{c,d}}).
OPTIONS('COMPLEX_DELIMITER_LEVEL_2'=':')
All dictionary files path.
OPTIONS('ALL_DICTIONARY_PATH'='/opt/alldictionary/data.dictionary')
Dictionary file path for specified column.
OPTIONS('COLUMNDICT'='column1:dictionaryFilePath1,column2:dictionaryFilePath2')
NOTE: ALL_DICTIONARY_PATH and COLUMNDICT can't be used together.
Date and Timestamp format for specified column.
OPTIONS('DATEFORMAT' = 'yyyy-MM-dd','TIMESTAMPFORMAT'='yyyy-MM-dd HH:mm:ss')
NOTE: Date formats are specified by date pattern strings. The date pattern letters in CarbonData are same as in JAVA. Refer to SimpleDateFormat.
Range bounds for sort columns.
Suppose the table is created with ‘SORT_COLUMNS’=‘name,id’ and the range for name is aaa to zzz, the value range for id is 0 to 1000. Then during data loading, we can specify the following option to enhance data loading performance.
OPTIONS('SORT_COLUMN_BOUNDS'='f,250;l,500;r,750')
Each bound is separated by ‘;’ and each field value in bound is separated by ‘,’. In the example above, we provide 3 bounds to distribute records to 4 partitions. The values ‘f’,‘l’,‘r’ can evenly distribute the records. Inside carbondata, for a record we compare the value of sort columns with that of the bounds and decide which partition the record will be forwarded to.
NOTE:
Single Pass Loading enables single job to finish data loading with dictionary generation on the fly. It enhances performance in the scenarios where the subsequent data loading after initial load involves fewer incremental updates on the dictionary.
This option specifies whether to use single pass for loading data or not. By default this option is set to FALSE.
OPTIONS('SINGLE_PASS'='TRUE')
NOTE:
Example:
LOAD DATA local inpath '/opt/rawdata/data.csv' INTO table carbontable options('DELIMITER'=',', 'QUOTECHAR'='"','COMMENTCHAR'='#', 'HEADER'='false', 'FILEHEADER'='empno,empname,designation,doj,workgroupcategory, workgroupcategoryname,deptno,deptname,projectcode, projectjoindate,projectenddate,attendance,utilization,salary', 'MULTILINE'='true','ESCAPECHAR'='\','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':', 'ALL_DICTIONARY_PATH'='/opt/alldictionary/data.dictionary', 'SINGLE_PASS'='TRUE')
Methods of handling bad records are as follows:
OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true', 'BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon', 'BAD_RECORDS_ACTION'='REDIRECT', 'IS_EMPTY_DATA_BAD_RECORD'='false')
NOTE:
Example:
LOAD DATA INPATH 'filepath.csv' INTO TABLE tablename OPTIONS('BAD_RECORDS_LOGGER_ENABLE'='true','BAD_RECORD_PATH'='hdfs://hacluster/tmp/carbon', 'BAD_RECORDS_ACTION'='REDIRECT','IS_EMPTY_DATA_BAD_RECORD'='false')
OPTIONS('GLOBAL_SORT_PARTITIONS'='2')
NOTE:
This command inserts data into a CarbonData table, it is defined as a combination of two queries Insert and Select query respectively. It inserts records from a source table into a target CarbonData table, the source table can be a Hive table, Parquet table or a CarbonData table itself. It comes with the functionality to aggregate the records of a table by performing Select query on source table and load its corresponding resultant records into a CarbonData table.
INSERT INTO TABLE <CARBONDATA TABLE> SELECT * FROM sourceTableName [ WHERE { <filter_condition> } ]
You can also omit the table
keyword and write your query as:
INSERT INTO <CARBONDATA TABLE> SELECT * FROM sourceTableName [ WHERE { <filter_condition> } ]
Overwrite insert data:
INSERT OVERWRITE TABLE <CARBONDATA TABLE> SELECT * FROM sourceTableName [ WHERE { <filter_condition> } ]
NOTE:
Examples
INSERT INTO table1 SELECT item1, sum(item2 + 1000) as result FROM table2 group by item1
INSERT INTO table1 SELECT item1, item2, item3 FROM table2 where item2='xyz'
INSERT OVERWRITE TABLE table1 SELECT * FROM TABLE2
This command allows you to load data using static partition.
LOAD DATA [LOCAL] INPATH 'folder_path' INTO TABLE [db_name.]table_name PARTITION (partition_spec) OPTIONS(property_name=property_value, ...) INSERT INTO INTO TABLE [db_name.]table_name PARTITION (partition_spec) <SELECT STATEMENT>
Example:
LOAD DATA LOCAL INPATH '${env:HOME}/staticinput.csv' INTO TABLE locationTable PARTITION (country = 'US', state = 'CA') INSERT INTO TABLE locationTable PARTITION (country = 'US', state = 'AL') SELECT <columns list excluding partition columns> FROM another_user
This command allows you to load data using dynamic partition. If partition spec is not specified, then the partition is considered as dynamic.
Example:
LOAD DATA LOCAL INPATH '${env:HOME}/staticinput.csv' INTO TABLE locationTable INSERT INTO TABLE locationTable SELECT <columns list excluding partition columns> FROM another_user
This command will allow to update the CarbonData table based on the column expression and optional filter conditions.
UPDATE <table_name> SET (column_name1, column_name2, ... column_name n) = (column1_expression , column2_expression, ... column n_expression ) [ WHERE { <filter_condition> } ]
alternatively the following command can also be used for updating the CarbonData Table :
UPDATE <table_name> SET (column_name1, column_name2) =(select sourceColumn1, sourceColumn2 from sourceTable [ WHERE { <filter_condition> } ] ) [ WHERE { <filter_condition> } ]
NOTE: The update command fails if multiple input rows in source table are matched with single row in destination table.
Examples:
UPDATE t3 SET (t3_salary) = (t3_salary + 9) WHERE t3_name = 'aaa1'
UPDATE t3 SET (t3_date, t3_country) = ('2017-11-18', 'india') WHERE t3_salary < 15003
UPDATE t3 SET (t3_country, t3_name) = (SELECT t5_country, t5_name FROM t5 WHERE t5_id = 5) WHERE t3_id < 5
UPDATE t3 SET (t3_date, t3_serialname, t3_salary) = (SELECT '2099-09-09', t5_serialname, '9999' FROM t5 WHERE t5_id = 5) WHERE t3_id < 5
UPDATE t3 SET (t3_country, t3_salary) = (SELECT t5_country, t5_salary FROM t5 FULL JOIN t3 u WHERE u.t3_id = t5_id and t5_id=6) WHERE t3_id >6
NOTE: Update Complex datatype columns is not supported.
This command allows us to delete records from CarbonData table.
DELETE FROM table_name [WHERE expression]
Examples:
DELETE FROM carbontable WHERE column1 = 'china'
DELETE FROM carbontable WHERE column1 IN ('china', 'USA')
DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2)
DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 WHERE column1 = 'USA')
Compaction improves the query performance significantly.
There are several types of compaction.
ALTER TABLE [db_name.]table_name COMPACT 'MINOR/MAJOR/CUSTOM'
In Minor compaction, user can specify the number of loads to be merged. Minor compaction triggers for every data load if the parameter carbon.enable.auto.load.merge is set to true. If any segments are available to be merged, then compaction will run parallel with data load, there are 2 levels in minor compaction:
ALTER TABLE table_name COMPACT 'MINOR'
In Major compaction, multiple segments can be merged into one large segment. User will specify the compaction size until which segments can be merged, Major compaction is usually done during the off-peak time. Configure the property carbon.major.compaction.size with appropriate value in MB.
This command merges the specified number of segments into one segment:
ALTER TABLE table_name COMPACT 'MAJOR'
In Custom compaction, user can directly specify segment ids to be merged into one large segment. All specified segment ids should exist and be valid, otherwise compaction will fail. Custom compaction is usually done during the off-peak time.
ALTER TABLE table_name COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (2,3,4)
NOTE: Compaction is unsupported for table containing Complex columns.
Clean the segments which are compacted:
CLEAN FILES FOR TABLE carbon_table