CarbonData provides SDK to facilitate
In the carbon jars package, there exist a carbondata-store-sdk-x.x.x-SNAPSHOT.jar, including SDK writer and reader. If user want to use SDK, except carbondata-store-sdk-x.x.x-SNAPSHOT.jar, it needs carbondata-core-x.x.x-SNAPSHOT.jar, carbondata-common-x.x.x-SNAPSHOT.jar, carbondata-format-x.x.x-SNAPSHOT.jar, carbondata-hadoop-x.x.x-SNAPSHOT.jar and carbondata-processing-x.x.x-SNAPSHOT.jar. What's more, user also can use carbondata-sdk.jar directly.
This SDK writer, writes carbondata file and carbonindex file at a given path. External client can make use of this writer to convert other format data or live data to create carbondata and index files. These SDK writer output contains just carbondata and carbonindex files. No metadata folder will be present.
import java.io.IOException; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.sdk.file.CarbonWriter; import org.apache.carbondata.sdk.file.CarbonWriterBuilder; import org.apache.carbondata.sdk.file.Field; import org.apache.carbondata.sdk.file.Schema; public class TestSdk { // pass true or false while executing the main to use offheap memory or not public static void main(String[] args) throws IOException, InvalidLoadOptionException { if (args.length > 0 && args[0] != null) { testSdkWriter(args[0]); } else { testSdkWriter("true"); } } public static void testSdkWriter(String enableOffheap) throws IOException, InvalidLoadOptionException { String path = "./target/testCSVSdkWriter"; Field[] fields = new Field[2]; fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); Schema schema = new Schema(fields); CarbonProperties.getInstance().addProperty("enable.offheap.sort", enableOffheap); CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withCsvInput(schema).writtenBy("SDK"); CarbonWriter writer = builder.build(); int rows = 5; for (int i = 0; i < rows; i++) { writer.write(new String[] { "robot" + (i % 10), String.valueOf(i) }); } writer.close(); } }
import java.io.IOException; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.sdk.file.AvroCarbonWriter; import org.apache.carbondata.sdk.file.CarbonWriter; import org.apache.carbondata.sdk.file.Field; import org.apache.avro.generic.GenericData; import org.apache.commons.lang.CharEncoding; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; public class TestSdkAvro { public static void main(String[] args) throws IOException, InvalidLoadOptionException { testSdkWriter(); } public static void testSdkWriter() throws IOException, InvalidLoadOptionException { String path = "./AvroCarbonWriterSuiteWriteFiles"; // Avro schema String avroSchema = "{" + " \"type\" : \"record\"," + " \"name\" : \"Acme\"," + " \"fields\" : [" + "{ \"name\" : \"fname\", \"type\" : \"string\" }," + "{ \"name\" : \"age\", \"type\" : \"int\" }]" + "}"; String json = "{\"fname\":\"bob\", \"age\":10}"; // conversion to GenericData.Record JsonAvroConverter converter = new JsonAvroConverter(); GenericData.Record record = converter.convertToGenericDataRecord( json.getBytes(CharEncoding.UTF_8), new org.apache.avro.Schema.Parser().parse(avroSchema)); try { CarbonWriter writer = CarbonWriter.builder() .outputPath(path) .withAvroInput(new org.apache.avro.Schema.Parser().parse(avroSchema)).writtenBy("SDK").build(); for (int i = 0; i < 100; i++) { writer.write(record); } writer.close(); } catch (Exception e) { e.printStackTrace(); } } }
import java.io.IOException; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.sdk.file.CarbonWriter; import org.apache.carbondata.sdk.file.CarbonWriterBuilder; import org.apache.carbondata.sdk.file.Field; import org.apache.carbondata.sdk.file.Schema; public class TestSdkJson { public static void main(String[] args) throws InvalidLoadOptionException { testJsonSdkWriter(); } public static void testJsonSdkWriter() throws InvalidLoadOptionException { String path = "./target/testJsonSdkWriter"; Field[] fields = new Field[2]; fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); Schema CarbonSchema = new Schema(fields); CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withJsonInput(CarbonSchema).writtenBy("SDK"); // initialize json writer with carbon schema CarbonWriter writer = builder.build(); // one row of json Data as String String JsonRow = "{\"name\":\"abcd\", \"age\":10}"; int rows = 5; for (int i = 0; i < rows; i++) { writer.write(JsonRow); } writer.close(); } }
Each of SQL data types and Avro Data Types are mapped into data types of SDK. Following are the mapping:
SQL DataTypes | Avro DataTypes | Mapped SDK DataTypes |
---|---|---|
BOOLEAN | BOOLEAN | DataTypes.BOOLEAN |
SMALLINT | - | DataTypes.SHORT |
INTEGER | INTEGER | DataTypes.INT |
BIGINT | LONG | DataTypes.LONG |
DOUBLE | DOUBLE | DataTypes.DOUBLE |
VARCHAR | - | DataTypes.STRING |
FLOAT | FLOAT | DataTypes.FLOAT |
BYTE | - | DataTypes.BYTE |
DATE | DATE | DataTypes.DATE |
TIMESTAMP | - | DataTypes.TIMESTAMP |
STRING | STRING | DataTypes.STRING |
DECIMAL | DECIMAL | DataTypes.createDecimalType(precision, scale) |
ARRAY | ARRAY | DataTypes.createArrayType(elementType) |
STRUCT | RECORD | DataTypes.createStructType(fields) |
- | ENUM | DataTypes.STRING |
- | UNION | DataTypes.createStructType(types) |
- | MAP | DataTypes.createMapType(keyType, valueType) |
- | TimeMillis | DataTypes.INT |
- | TimeMicros | DataTypes.LONG |
- | TimestampMillis | DataTypes.TIMESTAMP |
- | TimestampMicros | DataTypes.TIMESTAMP |
NOTE:
Carbon Supports below logical types of AVRO. a. Date The date logical type represents a date within the calendar, with no reference to a particular time zone or time of day. A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar). b. Timestamp (millisecond precision) The timestamp-millis logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond. A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC. c. Timestamp (microsecond precision) The timestamp-micros logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one microsecond. A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC. d. Decimal The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale. A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two's-complement representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified using an attribute. e. Time (millisecond precision) The time-millis logical type represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one millisecond. A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds after midnight, 00:00:00.000. f. Time (microsecond precision) The time-micros logical type represents a time of day, with no reference to a particular calendar, time zone or date, with a precision of one microsecond. A time-micros logical type annotates an Avro long, where the long stores the number of microseconds after midnight, 00:00:00.000000.
Currently the values of logical types are not validated by carbon. Expect that avro record passed by the user is already validated by avro record generator tools.
If the string data is more than 32K in length, use withTableProperties() with “long_string_columns” property or directly use DataTypes.VARCHAR if it is carbon schema.
Avro Bytes, Fixed and Duration data types are not yet supported.
Instead of creating table and query it, you can also query that file directly with SQL.
SELECT * FROM carbonfile.`$Path`
Find example code at DirectSQLExample in the CarbonData repo.
/** * Sets the output path of the writer builder * @param path is the absolute path where output files are written * This method must be called when building CarbonWriterBuilder * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder outputPath(String path);
/** * to set the timestamp in the carbondata and carbonindex index files * @param UUID is a timestamp to be used in the carbondata and carbonindex index files. * By default set to zero. * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder uniqueIdentifier(long UUID);
/** * To set the carbondata file size in MB between 1MB-2048MB * @param blockSize is size in MB between 1MB to 2048 MB * default value is 1024 MB * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder withBlockSize(int blockSize);
/** * To set the blocklet size of carbondata file * @param blockletSize is blocklet size in MB * default value is 64 MB * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder withBlockletSize(int blockletSize);
/** * @param enableLocalDictionary enable local dictionary , default is false * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder enableLocalDictionary(boolean enableLocalDictionary);
/** * @param localDictionaryThreshold is localDictionaryThreshold,default is 10000 * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder localDictionaryThreshold(int localDictionaryThreshold) ;
/** * sets the list of columns that needs to be in sorted order * @param sortColumns is a string array of columns that needs to be sorted. * If it is null or by default all dimensions are selected for sorting * If it is empty array, no columns are sorted * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder sortBy(String[] sortColumns);
/** * sets the taskNo for the writer. SDKs concurrently running * will set taskNo in order to avoid conflicts in file's name during write. * @param taskNo is the TaskNo user wants to specify. * by default it is system time in nano seconds. * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder taskNo(long taskNo);
/** * To support the load options for sdk writer * @param options key,value pair of load options. * supported keys values are * a. bad_records_logger_enable -- true (write into separate logs), false * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT * c. bad_record_path -- path * d. dateformat -- same as JAVA SimpleDateFormat * e. timestampformat -- same as JAVA SimpleDateFormat * f. complex_delimiter_level_1 -- value to Split the complexTypeData * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData * h. quotechar * i. escapechar * * Default values are as follows. * * a. bad_records_logger_enable -- "false" * b. bad_records_action -- "FAIL" * c. bad_record_path -- "" * d. dateformat -- "" , uses from carbon.properties file * e. timestampformat -- "", uses from carbon.properties file * f. complex_delimiter_level_1 -- "$" * g. complex_delimiter_level_2 -- ":" * h. quotechar -- "\"" * i. escapechar -- "\\" * * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
/** * To support the table properties for sdk writer * * @param options key,value pair of create table properties. * supported keys values are * a. table_blocksize -- [1-2048] values in MB. Default value is 1024 * b. table_blocklet_size -- values in MB. Default value is 64 MB * c. local_dictionary_threshold -- positive value, default is 10000 * d. local_dictionary_enable -- true / false. Default is false * e. sort_columns -- comma separated column. "c1,c2". Default all dimensions are sorted. If empty string "" is passed. No columns are sorted * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort" * k. long_string_columns -- comma separated string columns which are more than 32k length. * default value is null. * l. inverted_index -- comma separated string columns for which inverted index needs to be * generated * * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
/** * To make sdk writer thread safe. * * @param numOfThreads should number of threads in which writer is called in multi-thread scenario * default sdk writer is not thread safe. * can use one writer instance in one thread only. * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder withThreadSafe(short numOfThreads);
/** * To support hadoop configuration * * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder withHadoopConf(Configuration conf)
/** * to build a {@link CarbonWriter}, which accepts row in CSV format * * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema} * @return CarbonWriterBuilder */ public CarbonWriterBuilder withCsvInput(Schema schema);
/** * to build a {@link CarbonWriter}, which accepts Avro object * * @param avroSchema avro Schema object {org.apache.avro.Schema} * @return CarbonWriterBuilder */ public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema);
/** * to build a {@link CarbonWriter}, which accepts Json object * * @param carbonSchema carbon Schema object * @return CarbonWriterBuilder */ public CarbonWriterBuilder withJsonInput(Schema carbonSchema);
/** * To support writing the ApplicationName which is writing the carbondata file * This is a mandatory API to call, else the build() call will fail with error. * @param application name which is writing the carbondata files * @return CarbonWriterBuilder */ public CarbonWriterBuilder writtenBy(String appName) {
/** * sets the list of columns for which inverted index needs to generated * @param invertedIndexColumns is a string array of columns for which inverted index needs to * generated. * If it is null or an empty array, inverted index will be generated for none of the columns * @return updated CarbonWriterBuilder */ public CarbonWriterBuilder invertedIndexFor(String[] invertedIndexColumns);
/** * Build a {@link CarbonWriter} * This writer is not thread safe, * use withThreadSafe() configuration in multi thread environment * * @return CarbonWriter {AvroCarbonWriter/CSVCarbonWriter/JsonCarbonWriter based on Input Type } * @throws IOException * @throws InvalidLoadOptionException */ public CarbonWriter build() throws IOException, InvalidLoadOptionException;
/** * Configure Row Record Reader for reading. * */ public CarbonReaderBuilder withRowRecordReader()
/** * Write an object to the file, the format of the object depends on the implementation * If AvroCarbonWriter, object is of type org.apache.avro.generic.GenericData.Record, * which is one row of data. * If CSVCarbonWriter, object is of type String[], which is one row of data * If JsonCarbonWriter, object is of type String, which is one row of json * @param object * @throws IOException */ public abstract void write(Object object) throws IOException;
/** * Flush and close the writer */ public abstract void close() throws IOException;
/** * Create a {@link CarbonWriterBuilder} to build a {@link CarbonWriter} */ public static CarbonWriterBuilder builder() { return new CarbonWriterBuilder(); }
/** * Field Constructor * @param name name of the field * @param type datatype of field, specified in strings. */ public Field(String name, String type);
/** * Field constructor * @param name name of the field * @param type datatype of the field of class DataType */ public Field(String name, DataType type);
/** * construct a schema with fields * @param fields */ public Schema(Field[] fields);
/** * Create a Schema using JSON string, for example: * [ * {"name":"string"}, * {"age":"int"} * ] * @param json specified as string * @return Schema */ public static Schema parseJson(String json);
/** * converts avro schema to carbon schema, required by carbonWriter * * @param avroSchemaString json formatted avro schema as string * @return carbon sdk schema */ public static org.apache.carbondata.sdk.file.Schema getCarbonSchemaFromAvroSchema(String avroSchemaString);
This SDK reader reads CarbonData file and carbonindex file at a given path. External client can make use of this reader to read CarbonData files without CarbonSession.
// 1. Create carbon reader String path = "./testWriteFiles"; CarbonReader reader = CarbonReader .builder(path, "_temp") .projection(new String[]{"stringField", "shortField", "intField", "longField", "doubleField", "boolField", "dateField", "timeField", "decimalField"}) .build(); // 2. Read data long day = 24L * 3600 * 1000; int i = 0; while (reader.hasNext()) { Object[] row = (Object[]) reader.readNextRow(); System.out.println(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t", i, row[0], row[1], row[2], row[3], row[4], row[5], new Date((day * ((int) row[6]))), new Timestamp((long) row[7] / 1000), row[8] )); i++; } // 3. Close this reader reader.close();
Find example code at CarbonReaderExample in the CarbonData repo.
/** * Return a new {@link CarbonReaderBuilder} instance * * @param tablePath table store path * @param tableName table name * @return CarbonReaderBuilder object */ public static CarbonReaderBuilder builder(String tablePath, String tableName);
/** * Return a new CarbonReaderBuilder instance * Default value of table name is table + tablePath + time * * @param tablePath table path * @return CarbonReaderBuilder object */ public static CarbonReaderBuilder builder(String tablePath);
/** * Breaks the list of CarbonRecordReader in CarbonReader into multiple * CarbonReader objects, each iterating through some 'carbondata' files * and return that list of CarbonReader objects * * If the no. of files is greater than maxSplits, then break the * CarbonReader into maxSplits splits, with each split iterating * through >= 1 file. * * If the no. of files is less than maxSplits, then return list of * CarbonReader with size as the no. of files, with each CarbonReader * iterating through exactly one file * * @param maxSplits: Int * @return list of CarbonReader objects */ public List<CarbonReader> split(int maxSplits); ``
/**
/**
/**
### Class org.apache.carbondata.sdk.file.CarbonReaderBuilder
/**
/**
/**
/**
/**
### Class org.apache.carbondata.sdk.file.CarbonSchemaReader
/**
/**
/**
/**
/**
/**
### Class org.apache.carbondata.sdk.file.Schema
/**
/**
/**
/**
### Class org.apache.carbondata.sdk.file.Field
/**
/**
Find S3 example code at [SDKS3Example](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java) in the CarbonData repo. # Common API List for CarbonReader and CarbonWriter ### Class org.apache.carbondata.core.util.CarbonProperties
/**
/**
/**
/**
Reference : [list of carbon properties](./configuration-parameters.md)