Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
Package carbon jar, and copy assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar to $SPARK_HOME/jars
mvn clean package -DskipTests -Pspark-2.2
Start a socket data server in a terminal
nc -lk 9099
type some CSV rows as following
1,col1 2,col2 3,col3 4,col4 5,col5
Start spark-shell in new terminal, type :paste, then copy and run the following code.
import java.io.File import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.util.path.CarbonStorePath val warehouse = new File("./warehouse").getCanonicalPath val metastore = new File("./metastore").getCanonicalPath val spark = SparkSession .builder() .master("local") .appName("StreamExample") .config("spark.sql.warehouse.dir", warehouse) .getOrCreateCarbonSession(warehouse, metastore) spark.sparkContext.setLogLevel("ERROR") // drop table if exists previously spark.sql(s"DROP TABLE IF EXISTS carbon_table") // Create target carbon table and populate with initial data spark.sql( s""" | CREATE TABLE carbon_table ( | col1 INT, | col2 STRING | ) | STORED BY 'carbondata' | TBLPROPERTIES('streaming'='true')""".stripMargin) val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark) val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) // batch load var qry: StreamingQuery = null val readSocketDF = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9099) .load() // Write data from socket stream to carbondata file qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) .option("dbName", "default") .option("tableName", "carbon_table") .start() // start new thread to show data new Thread() { override def run(): Unit = { do { spark.sql("select * from carbon_table").show(false) Thread.sleep(10000) } while (true) } }.start() qry.awaitTermination()
Continue to type some rows into data server, and spark-shell will show the new data of the table.
Streaming table is just a normal carbon table with “streaming” table property, user can create streaming table using following DDL.
CREATE TABLE streaming_table ( col1 INT, col2 STRING ) STORED BY 'carbondata' TBLPROPERTIES('streaming'='true')
property name | default | description |
---|---|---|
streaming | false | Whether to enable streaming ingest feature for this table Value range: true, false |
“DESC FORMATTED” command will show streaming property.
DESC FORMATTED streaming_table
For an old table, use ALTER TABLE command to set the streaming property.
ALTER TABLE streaming_table SET TBLPROPERTIES('streaming'='true')
At the begin of streaming ingestion, the system will try to acquire the table level lock of streaming.lock file. If the system isn't able to acquire the lock of this table, it will throw an InterruptedException.
The input data of streaming will be ingested into a segment of the CarbonData table, the status of this segment is streaming. CarbonData call it a streaming segment. The “tablestatus” file will record the segment status and data size. The user can use “SHOW SEGMENTS FOR TABLE tableName” to check segment status.
After the streaming segment reaches the max size, CarbonData will change the segment status to “streaming finish” from “streaming”, and create new “streaming” segment to continue to ingest streaming data.
option | default | description |
---|---|---|
carbon.streaming.segment.max.size | 1024000000 | Unit: byte max size of streaming segment |
segment status | description |
---|---|
streaming | The segment is running streaming ingestion |
streaming finish | The segment already finished streaming ingestion, it will be handed off to a segment in the columnar format |
Use below command to change the status of “streaming” segment to “streaming finish” segment.
ALTER TABLE streaming_table FINISH STREAMING
Use below command to handoff “streaming finish” segment to columnar format segment manually.
ALTER TABLE streaming_table COMPACT 'streaming'
Config the property “carbon.streaming.auto.handoff.enabled” to auto handoff streaming segment. If the value of this property is true, after the streaming segment reaches the max size, CarbonData will change this segment to “streaming finish” status and trigger to auto handoff this segment to columnar format segment in a new thread.
property name | default | description |
---|---|---|
carbon.streaming.auto.handoff.enabled | true | whether to auto trigger handoff operation |
Use below command to handoff all streaming segments to columnar format segments and modify the streaming property to false, this table becomes a normal table.
ALTER TABLE streaming_table COMPACT 'close_streaming'