blob: eeafb7a427db070d473bffc62c1e17db37ce5077 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.influxdb;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
/**
* Configuration for InfluxDB.
*/
public class InfluxDBConfig implements Serializable {
private static final long serialVersionUID = 1L;
private static final int DEFAULT_BATCH_ACTIONS = 2000;
private static final int DEFAULT_FLUSH_DURATION = 100;
private String url;
private String username;
private String password;
private String database;
private int batchActions;
private int flushDuration;
private TimeUnit flushDurationTimeUnit;
private boolean enableGzip;
private boolean createDatabase;
public InfluxDBConfig(InfluxDBConfig.Builder builder) {
Preconditions.checkArgument(builder != null, "InfluxDBConfig builder can not be null");
this.url = Preconditions.checkNotNull(builder.getUrl(), "host can not be null");
this.username = Preconditions.checkNotNull(builder.getUsername(), "username can not be null");
this.password = Preconditions.checkNotNull(builder.getPassword(), "password can not be null");
this.database = Preconditions.checkNotNull(builder.getDatabase(), "database name can not be null");
this.batchActions = builder.getBatchActions();
this.flushDuration = builder.getFlushDuration();
this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit();
this.enableGzip = builder.isEnableGzip();
this.createDatabase = builder.isCreateDatabase();
}
public String getUrl() {
return url;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getDatabase() {
return database;
}
public int getBatchActions() {
return batchActions;
}
public int getFlushDuration() {
return flushDuration;
}
public TimeUnit getFlushDurationTimeUnit() {
return flushDurationTimeUnit;
}
public boolean isEnableGzip() {
return enableGzip;
}
public boolean isCreateDatabase() { return createDatabase; }
/**
* Creates a new {@link InfluxDBConfig.Builder} instance.
* <p/>
* This is a convenience method for {@code new InfluxDBConfig.Builder()}.
*
* @param url the url to connect to
* @param username the username which is used to authorize against the influxDB instance
* @param password the password for the username which is used to authorize against the influxDB
* instance
* @param database the name of the database to write
* @return the new InfluxDBConfig builder.
*/
public static Builder builder(String url, String username, String password, String database) {
return new Builder(url, username, password, database);
}
/**
* A builder used to create a build an instance of a InfluxDBConfig.
*/
public static class Builder {
private String url;
private String username;
private String password;
private String database;
private int batchActions = DEFAULT_BATCH_ACTIONS;
private int flushDuration = DEFAULT_FLUSH_DURATION;
private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS;
private boolean enableGzip = false;
private boolean createDatabase = false;
/**
* Creates a builder
*
* @param url the url to connect to
* @param username the username which is used to authorize against the influxDB instance
* @param password the password for the username which is used to authorize against the influxDB
* instance
* @param database the name of the database to write
*/
public Builder(String url, String username, String password, String database) {
this.url = url;
this.username = username;
this.password = password;
this.database = database;
}
/**
* Sets url.
*
* @param url the url to connect to
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder url(String url) {
this.url = url;
return this;
}
/**
* Sets username.
*
* @param username the username which is used to authorize against the influxDB instance
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder username(String username) {
this.username = username;
return this;
}
/**
* Sets password.
*
* @param password the password for the username which is used to authorize against the influxDB
* instance
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder password(String password) {
this.password = password;
return this;
}
/**
* Sets database name.
*
* @param database the name of the database to write
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder database(String database) {
this.database = database;
return this;
}
/**
* Sets when to flush a new bulk request based on the number of batch actions currently added.
* Defaults to <tt>DEFAULT_BATCH_ACTIONS</tt>. Can be set to <tt>-1</tt> to disable it.
*
* @param batchActions number of Points written after which a write must happen.
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder batchActions(int batchActions) {
this.batchActions = batchActions;
return this;
}
/**
* Sets a flush interval flushing *any* bulk actions pending if the interval passes.
*
* @param flushDuration the flush duration
* @param flushDurationTimeUnit the TimeUnit of the flush duration
* @return this Builder to use it fluent
*/
public Builder flushDuration(int flushDuration, TimeUnit flushDurationTimeUnit) {
this.flushDuration = flushDuration;
this.flushDurationTimeUnit = flushDurationTimeUnit;
return this;
}
/**
* Enable Gzip compress for http request body.
*
* @param enableGzip the enableGzip value
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder enableGzip(boolean enableGzip) {
this.enableGzip = enableGzip;
return this;
}
/**
* Make InfluxDb sink create new database
*
* @param createDatabase createDatabase switch value
* @return this Builder to use it fluent
*/
public InfluxDBConfig.Builder createDatabase(boolean createDatabase) {
this.createDatabase = createDatabase;
return this;
}
/**
* Builds InfluxDBConfig.
*
* @return the InfluxDBConfig instance.
*/
public InfluxDBConfig build() {
return new InfluxDBConfig(this);
}
public String getUrl() {
return url;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getDatabase() {
return database;
}
public int getBatchActions() {
return batchActions;
}
public int getFlushDuration() {
return flushDuration;
}
public TimeUnit getFlushDurationTimeUnit() {
return flushDurationTimeUnit;
}
public boolean isEnableGzip() {
return enableGzip;
}
public boolean isCreateDatabase() { return createDatabase; }
}
}