blob: 476b1bfb0f63f763fdd89d4150819c056edf596e [file] [log] [blame]
{
"paragraphs": [
{
"title": "Overview",
"text": "%md\n\nThis tutorial demonstrate how to use Flink do streaming processing via its streaming sql + udf. In this tutorial, we read data from kafka queue and do some simple processing (just filtering here) and then write it back to another kafka queue. We use this [docker](https://zeppelin-kafka-connect-datagen.readthedocs.io/en/latest/) to create kafka cluster and source data \n\n* Make sure you add the following ip host name mapping to your hosts file, otherwise you may not be able to connect to the kafka cluster in docker\n\n```\n127.0.0.1 broker\n```\n\nUse the following command to generate the sample data.\n\n```\ncurl -X POST http://localhost:8083/connectors \\\n-H \u0027Content-Type:application/json\u0027 \\\n-H \u0027Accept:application/json\u0027 \\\n-d @connect.source.datagen.json\n```",
"user": "anonymous",
"dateUpdated": "2021-07-26 05:50:06.144",
"progress": 0,
"config": {
"runOnSelectionChange": true,
"title": true,
"checkEmpty": true,
"colWidth": 12.0,
"fontSize": 9.0,
"enabled": true,
"results": {},
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"editorMode": "ace/mode/markdown",
"editorHide": true,
"tableHide": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "\u003cdiv class\u003d\"markdown-body\"\u003e\n\u003cp\u003eThis tutorial demonstrate how to use Flink do streaming processing via its streaming sql + udf. In this tutorial, we read data from kafka queue and do some simple processing (just filtering here) and then write it back to another kafka queue. We use this \u003ca href\u003d\"https://zeppelin-kafka-connect-datagen.readthedocs.io/en/latest/\"\u003edocker\u003c/a\u003e to create kafka cluster and source data\u003c/p\u003e\n\u003cul\u003e\n\u003cli\u003eMake sure you add the following ip host name mapping to your hosts file, otherwise you may not be able to connect to the kafka cluster in docker\u003c/li\u003e\n\u003c/ul\u003e\n\u003cpre\u003e\u003ccode\u003e127.0.0.1 broker\n\u003c/code\u003e\u003c/pre\u003e\n\u003cp\u003eUse the following command to generate the sample data.\u003c/p\u003e\n\u003cpre\u003e\u003ccode\u003ecurl -X POST http://localhost:8083/connectors \\\n-H \u0027Content-Type:application/json\u0027 \\\n-H \u0027Accept:application/json\u0027 \\\n-d @connect.source.datagen.json\n\u003c/code\u003e\u003c/pre\u003e\n\n\u003c/div\u003e"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1579054287919_-61477360",
"id": "paragraph_1579054287919_-61477360",
"dateCreated": "2020-01-15 10:11:27.919",
"dateStarted": "2021-07-26 05:50:06.145",
"dateFinished": "2021-07-26 05:50:06.153",
"status": "FINISHED"
},
{
"title": "Configure flink kafka connector",
"text": "%flink.conf\n\n# You need to run this paragraph first before running any flink code.\n\nflink.execution.packages\torg.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0",
"user": "anonymous",
"dateUpdated": "2020-04-29 15:45:27.361",
"progress": 0,
"config": {
"colWidth": 12.0,
"fontSize": 9.0,
"enabled": true,
"results": {},
"editorSetting": {
"language": "text",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/text",
"title": true
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1587959422055_1513725291",
"id": "paragraph_1587959422055_1513725291",
"dateCreated": "2020-04-27 11:50:22.055",
"dateStarted": "2020-04-29 15:45:27.366",
"dateFinished": "2020-04-29 15:45:27.369",
"status": "FINISHED"
},
{
"title": "Create kafka source table",
"text": "%flink.ssql\n\nDROP TABLE IF EXISTS source_kafka;\n\nCREATE TABLE source_kafka (\n status STRING,\n direction STRING,\n event_ts BIGINT\n) WITH (\n \u0027connector.type\u0027 \u003d \u0027kafka\u0027, \n \u0027connector.version\u0027 \u003d \u0027universal\u0027,\n \u0027connector.topic\u0027 \u003d \u0027generated.events\u0027,\n \u0027connector.startup-mode\u0027 \u003d \u0027earliest-offset\u0027,\n \u0027connector.properties.zookeeper.connect\u0027 \u003d \u0027localhost:2181\u0027,\n \u0027connector.properties.bootstrap.servers\u0027 \u003d \u0027localhost:9092\u0027,\n \u0027connector.properties.group.id\u0027 \u003d \u0027testGroup\u0027,\n \u0027connector.startup-mode\u0027 \u003d \u0027earliest-offset\u0027,\n \u0027format.type\u0027\u003d\u0027json\u0027,\n \u0027update-mode\u0027 \u003d \u0027append\u0027\n);",
"user": "anonymous",
"dateUpdated": "2020-04-29 15:45:29.234",
"progress": 0,
"config": {
"colWidth": 6.0,
"fontSize": 9.0,
"enabled": true,
"results": {},
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/sql",
"runOnSelectionChange": true,
"title": true,
"checkEmpty": true
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1578044987529_1240899810",
"id": "paragraph_1578044987529_1240899810",
"dateCreated": "2020-01-03 17:49:47.529",
"dateStarted": "2020-04-29 15:45:29.238",
"dateFinished": "2020-04-29 15:45:42.005",
"status": "FINISHED"
},
{
"title": "Create kafka sink table",
"text": "%flink.ssql\n\nDROP TABLE IF EXISTS sink_kafka;\n\nCREATE TABLE sink_kafka (\n status STRING,\n direction STRING,\n event_ts TIMESTAMP(3),\n WATERMARK FOR event_ts AS event_ts - INTERVAL \u00275\u0027 SECOND\n) WITH (\n \u0027connector.type\u0027 \u003d \u0027kafka\u0027, \n \u0027connector.version\u0027 \u003d \u0027universal\u0027, \n \u0027connector.topic\u0027 \u003d \u0027generated.events2\u0027,\n \u0027connector.properties.zookeeper.connect\u0027 \u003d \u0027localhost:2181\u0027,\n \u0027connector.properties.bootstrap.servers\u0027 \u003d \u0027localhost:9092\u0027,\n \u0027connector.properties.group.id\u0027 \u003d \u0027testGroup\u0027,\n \u0027format.type\u0027\u003d\u0027json\u0027,\n \u0027update-mode\u0027 \u003d \u0027append\u0027\n)\n\n",
"user": "anonymous",
"dateUpdated": "2020-04-29 15:45:30.663",
"progress": 0,
"config": {
"runOnSelectionChange": true,
"title": true,
"checkEmpty": true,
"colWidth": 6.0,
"fontSize": 9.0,
"enabled": true,
"results": {},
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/sql"
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1578905686087_1273839451",
"id": "paragraph_1578905686087_1273839451",
"dateCreated": "2020-01-13 16:54:46.087",
"dateStarted": "2020-04-29 15:45:41.561",
"dateFinished": "2020-04-29 15:45:42.005",
"status": "FINISHED"
},
{
"title": "Transform the data in source table and write it to sink table",
"text": "%flink.ssql\n\ninsert into sink_kafka select status, direction, cast(event_ts/1000000000 as timestamp(3)) from source_kafka where status \u003c\u003e \u0027foo\u0027\n",
"user": "anonymous",
"dateUpdated": "2020-04-29 15:45:43.388",
"progress": 0,
"config": {
"runOnSelectionChange": true,
"title": true,
"checkEmpty": true,
"colWidth": 12.0,
"fontSize": 9.0,
"enabled": true,
"results": {},
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/sql",
"editorHide": false
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1578905715189_33634195",
"id": "paragraph_1578905715189_33634195",
"dateCreated": "2020-01-13 16:55:15.189",
"dateStarted": "2020-04-29 15:45:43.391",
"dateFinished": "2020-04-29 16:06:27.181",
"status": "ABORT"
},
{
"title": "Preview sink table result",
"text": "%flink.ssql(type\u003dupdate)\n\nselect * from sink_kafka order by event_ts desc limit 10;",
"user": "anonymous",
"dateUpdated": "2020-04-29 15:28:01.122",
"progress": 0,
"config": {
"runOnSelectionChange": true,
"title": true,
"checkEmpty": true,
"colWidth": 12.0,
"fontSize": 9.0,
"enabled": true,
"results": {
"0": {
"graph": {
"mode": "table",
"height": 300.0,
"optionOpen": false,
"setting": {
"table": {
"tableGridState": {
"columns": [
{
"name": "status0",
"visible": true,
"width": "*",
"sort": {},
"filters": [
{}
],
"pinned": ""
},
{
"name": "direction1",
"visible": true,
"width": "*",
"sort": {
"priority": 0.0,
"direction": "asc"
},
"filters": [
{}
],
"pinned": ""
},
{
"name": "event_ts2",
"visible": true,
"width": "*",
"sort": {},
"filters": [
{}
],
"pinned": ""
}
],
"scrollFocus": {},
"selection": [],
"grouping": {
"grouping": [],
"aggregations": [],
"rowExpandedStates": {}
},
"treeView": {},
"pagination": {
"paginationCurrentPage": 1.0,
"paginationPageSize": 250.0
}
},
"tableColumnTypeState": {
"names": {
"status": "string",
"direction": "string",
"event_ts": "string"
},
"updated": false
},
"tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer for displaying aggregated values\"}]",
"tableOptionValue": {
"useFilter": false,
"showPagination": false,
"showAggregationFooter": false
},
"updated": false,
"initialized": false
}
},
"commonSetting": {}
}
}
},
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/sql",
"type": "update"
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1579058345516_-1005807622",
"id": "paragraph_1579058345516_-1005807622",
"dateCreated": "2020-01-15 11:19:05.518",
"dateStarted": "2020-04-29 15:28:01.131",
"dateFinished": "2020-04-29 15:28:15.162",
"status": "ABORT"
},
{
"text": "%flink.ssql\n",
"user": "anonymous",
"dateUpdated": "2020-04-29 15:27:31.430",
"progress": 0,
"config": {
"colWidth": 12.0,
"fontSize": 9.0,
"enabled": true,
"results": {},
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"editorMode": "ace/mode/sql"
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1579058056677_-1981512536",
"id": "paragraph_1579058056677_-1981512536",
"dateCreated": "2020-01-15 11:14:16.685",
"status": "READY"
}
],
"name": "4. Streaming ETL",
"id": "2EYD56B9B",
"defaultInterpreterGroup": "flink",
"version": "0.9.0-SNAPSHOT",
"noteParams": {},
"noteForms": {},
"angularObjects": {},
"config": {
"isZeppelinNotebookCronEnable": false
},
"info": {}
}