The CarbonData flink integration module is used to connect Flink and Carbon. The module provides a set of Flink BulkWriter implementations (CarbonLocalWriter (for Local and HDFS filesystems) and CarbonS3Writer). The data is processed by the Flink, and finally written into the stage directory of the target table by the CarbonXXXWriter.
By default, those data in table stage directory, can not be immediately queried, those data can be queried after the INSERT INTO $tableName STAGE
command is executed.
Since the flink data written to carbon is endless, in order to ensure the visibility of data and the controllable amount of data processed during the execution of each insert form stage command, the user should execute the insert from stage command in a timely manner.
The execution interval of the insert form stage command should take the data visibility requirements of the actual business and the flink data traffic. When the data visibility requirements are high or the data traffic is large, the execution interval should be appropriately shortened.
A typical scenario is that the data is cleaned and preprocessed by Flink, and then written to Carbon, for subsequent analysis and queries.
Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
Pseudo code and description:
// Import dependencies. import java.util.Properties import org.apache.carbon.flink.CarbonWriterFactory import org.apache.carbon.flink.ProxyFileSystem import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink // Specify database name. val databaseName = "default" // Specify target table name. val tableName = "test" // Table path of the target table. val tablePath = "/data/warehouse/test" // Specify local temporary path. val dataTempPath = "/data/temp/" val tableProperties = new Properties // Set the table properties here. val writerProperties = new Properties // Set the writer properties here, such as temp path, commit threshold, access key, secret key, endpoint, etc. val carbonProperties = new Properties // Set the carbon properties here, such as date format, store location, etc. // Create carbon bulk writer factory. Two writer types are supported: 'Local' and 'S3'. // Use CarbonWriterFactory.builder("Local") for Local and Hdfs File systems val writerFactory = CarbonWriterFactory.builder("Local").build( databaseName, tableName, tablePath, tableProperties, writerProperties, carbonProperties ) // Build a flink stream and run it. // 1. Create a new flink execution environment. val environment = StreamExecutionEnvironment.getExecutionEnvironment // Set flink environment configuration here, such as parallelism, checkpointing, restart strategy, etc. // 2. Create flink data source, may be a kafka source, custom source, or others. // The data type of source should be Array[AnyRef]. // Array length should equals to table column count, and values order in array should matches table column order. val source = ... // 3. Create flink stream and set source. val stream = environment.addSource(source) // 4. Add other flink operators here. // ... // 5. Set flink stream target (write data to carbon with a write sink). stream.addSink(StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build) // 6. Run flink stream. try { environment.execute } catch { case exception: Exception => // Handle execute exception here. }
Property | Name | Description |
---|---|---|
CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path | Usually is a local path, data will write to temp path first, and move to target data path finally. |
CarbonLocalProperty.COMMIT_THRESHOLD | carbon.writer.local.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
Property | Name | Description |
---|---|---|
CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key | Access key of s3 file system |
CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key | Secret key of s3 file system |
CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint | Endpoint of s3 file system |
CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path | Usually is a local path, data will write to temp path first, and move to target data path finally. |
CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold | While written data count reach the threshold, data writer will flush and move data to target data path. |
Refer Grammar Description for syntax.
Create target table.
CREATE TABLE test (col1 string, col2 string, col3 string) STORED AS carbondata
Writing flink data to local carbon table.
import java.util.Properties import org.apache.carbon.flink.CarbonLocalProperty import org.apache.carbon.flink.CarbonWriterFactory import org.apache.carbon.flink.ProxyFileSystem import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.source.SourceFunction val databaseName = "default" val tableName = "test" val tablePath = "/data/warehouse/test" val dataTempPath = "/data/temp/" val tableProperties = new Properties val writerProperties = new Properties writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath) val carbonProperties = new Properties carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024") val writerFactory = CarbonWriterFactory.builder("Local").build( databaseName, tableName, tablePath, tableProperties, writerProperties, carbonProperties ) val environment = StreamExecutionEnvironment.getExecutionEnvironment environment.setParallelism(1) environment.enableCheckpointing(2000L) environment.setRestartStrategy(RestartStrategies.noRestart) // Define a custom source. val source = new SourceFunction[Array[AnyRef]]() { override def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = { // Array length should equals to table column count, and values order in array should matches table column order. val data = new Array[AnyRef](3) data(0) = "value1" data(1) = "value2" data(2) = "value3" sourceContext.collect(data) } override def cancel(): Unit = { // do something. } } val stream = environment.addSource(source) val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build stream.addSink(streamSink) try { environment.execute } catch { case exception: Exception => // TODO throw new UnsupportedOperationException(exception) }
Insert into table from stage directory.
INSERT INTO test STAGE