tree: 67f1fa5bf66d9833e7bdcaf96914659798b8e8e7 [path history] [tgz]
  1. src/
  2. pom.xml
  3. README.md
pinot-connectors/pinot-flink-connector/README.md

Flink-Pinot Connector

Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables, including the upsert tables. You can read more about the motivation and design in this design proposal.

Quick Start

StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(2);
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0));

PinotControllerClient client = new PinotControllerClient();
// fetch Pinot schema
Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
// fetch Pinot table config
TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE");
// create Flink Pinot Sink
srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
execEnv.execute();

For more examples, please see src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java

Notes for backfilling upsert table

  • To correctly partition the output segments by the primary key, the Flink job must also include the partitionByKey operator before the Sink operator
  • The parallelism of the job must be set the same as the number of partitions of the Pinot table, so that the sink in each task executor can generate the segment of same partitions.
  • It’s important to plan the resource usage to avoid capacity issues such as out of memory. In particular, Pinot sink has an in-memory buffer of records, and it flushes when the threshold is reached. Currently, the threshold on the number of records is supported via the config of segmentFlushMaxNumRecords. In the future, we could add other types of threshold such as the memory usage of the buffer.