Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables, including the upsert tables. You can read more about the motivation and design in this design proposal.
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.setParallelism(2); DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0)); PinotControllerClient client = new PinotControllerClient(); // fetch Pinot schema Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores"); // fetch Pinot table config TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE"); // create Flink Pinot Sink srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema)); execEnv.execute();
For more examples, please see src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
segmentFlushMaxNumRecords
. In the future, we could add other types of threshold such as the memory usage of the buffer.