blob: 70e3bbea80a205cc94c18f1983f2162ad44fa098 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.streaming.connectors.influxdb.sink;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_BUCKET;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_ORGANIZATION;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_PASSWORD;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_URL;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.INFLUXDB_USERNAME;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_BUFFER_SIZE;
import static org.apache.flink.streaming.connectors.influxdb.sink.InfluxDBSinkOptions.WRITE_DATA_POINT_CHECKPOINT;
import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBSchemaSerializer;
import org.apache.flink.streaming.connectors.influxdb.sink.writer.InfluxDBWriter;
* The @builder class for {@link InfluxDBSink} to make it easier for the users to construct a {@link
* InfluxDBSink}.
* <p>The following example shows the minimum setup to create a InfluxDBSink that uses the Long
* values from a former operator and sends it to an InfluxDB instance.
* <pre>{@code
* InfluxDBSink<Long> influxDBSink = InfluxDBSink.builder()
* .setInfluxDBSchemaSerializer(new InfluxDBSerializer())
* .setInfluxDBUrl(getUrl())
* .setInfluxDBUsername(getUsername())
* .setInfluxDBPassword(getPassword())
* .setInfluxDBBucket(getBucket())
* .setInfluxDBOrganization(getOrg())
* .build();
* }</pre>
* <p>To specify the batch size that has a significant influence on performance, one can call {@link
* #setWriteBufferSize(int)}.
* <p>Check the Java docs of each individual methods to learn more about the settings to build a
* InfluxDBSource.
public final class InfluxDBSinkBuilder<IN> {
private InfluxDBSchemaSerializer<IN> influxDBSchemaSerializer;
private String influxDBUrl;
private String influxDBUsername;
private String influxDBPassword;
private String bucketName;
private String organizationName;
private final Configuration configuration;
InfluxDBSinkBuilder() {
this.influxDBUrl = null;
this.influxDBUsername = null;
this.influxDBPassword = null;
this.bucketName = null;
this.organizationName = null;
this.influxDBSchemaSerializer = null;
this.configuration = new Configuration();
* Sets the InfluxDB url.
* @param influxDBUrl the url of the InfluxDB instance to send data to.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> setInfluxDBUrl(final String influxDBUrl) {
this.influxDBUrl = influxDBUrl;
this.configuration.setString(INFLUXDB_URL, checkNotNull(influxDBUrl));
return this;
* Sets the InfluxDB user name.
* @param influxDBUsername the user name of the InfluxDB instance.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> setInfluxDBUsername(final String influxDBUsername) {
this.influxDBUsername = influxDBUsername;
this.configuration.setString(INFLUXDB_USERNAME, checkNotNull(influxDBUsername));
return this;
* Sets the InfluxDB password.
* @param influxDBPassword the password of the InfluxDB instance.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> setInfluxDBPassword(final String influxDBPassword) {
this.influxDBPassword = influxDBPassword;
this.configuration.setString(INFLUXDB_PASSWORD, checkNotNull(influxDBPassword));
return this;
* Sets the InfluxDB bucket name.
* @param bucketName the bucket name of the InfluxDB instance to store the data in.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> setInfluxDBBucket(final String bucketName) {
this.bucketName = bucketName;
this.configuration.setString(INFLUXDB_BUCKET, checkNotNull(bucketName));
return this;
* Sets the InfluxDB organization name.
* @param organizationName the organization name of the InfluxDB instance.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> setInfluxDBOrganization(final String organizationName) {
this.organizationName = organizationName;
this.configuration.setString(INFLUXDB_ORGANIZATION, checkNotNull(organizationName));
return this;
* Sets the {@link InfluxDBSchemaSerializer serializer} of the input type IN for the
* InfluxDBSink.
* @param influxDBSchemaSerializer the serializer for the input type.
* @return this InfluxDBSourceBuilder.
public <T extends IN> InfluxDBSinkBuilder<T> setInfluxDBSchemaSerializer(
final InfluxDBSchemaSerializer<T> influxDBSchemaSerializer) {
final InfluxDBSinkBuilder<T> sinkBuilder = (InfluxDBSinkBuilder<T>) this;
sinkBuilder.influxDBSchemaSerializer = influxDBSchemaSerializer;
return sinkBuilder;
* Sets if the InfluxDBSink should write checkpoint data points to InfluxDB.
* @param shouldWrite boolean if checkpoint should be written.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> addCheckpointDataPoint(final boolean shouldWrite) {
this.configuration.setBoolean(WRITE_DATA_POINT_CHECKPOINT, shouldWrite);
return this;
* Sets the buffer size of the {@link InfluxDBWriter}. This also determines the number of {@link
* com.influxdb.client.write.Point} send to the InfluxDB instance per request.
* @param bufferSize size of the buffer.
* @return this InfluxDBSinkBuilder.
public InfluxDBSinkBuilder<IN> setWriteBufferSize(final int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("The buffer size should be greater than 0.");
this.configuration.setInteger(WRITE_BUFFER_SIZE, bufferSize);
return this;
* Build the {@link InfluxDBSink}.
* @return a InfluxDBSink with the settings made for this builder.
public InfluxDBSink<IN> build() {
return new InfluxDBSink<>(this.influxDBSchemaSerializer, this.configuration);
// ------------- private helpers --------------
/** Checks if the SchemaSerializer and the influxDBConfig are not null and set. */
private void sanityCheck() {
// Check required settings.
checkNotNull(this.influxDBUrl, "The InfluxDB URL is required but not provided.");
checkNotNull(this.influxDBUsername, "The InfluxDB username is required but not provided.");
checkNotNull(this.influxDBPassword, "The InfluxDB password is required but not provided.");
checkNotNull(this.bucketName, "The Bucket name is required but not provided.");
checkNotNull(this.organizationName, "The Organization name is required but not provided.");
"Serialization schema is required but not provided.");