blob: 13a3fcb3c010a46f7bf8ba60273fe6a10274c2bc [file] [log] [blame]
/*
* Copyright 2019 FZI Forschungszentrum Informatik
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.streampipes.sinks.databases.jvm.influxdb;
import org.streampipes.model.DataSinkType;
import org.streampipes.model.graph.DataSinkDescription;
import org.streampipes.model.graph.DataSinkInvocation;
import org.streampipes.model.schema.PropertyScope;
import org.streampipes.sdk.builder.DataSinkBuilder;
import org.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.streampipes.sdk.helpers.EpRequirements;
import org.streampipes.sdk.helpers.Labels;
import org.streampipes.sdk.helpers.Locales;
import org.streampipes.sdk.helpers.SupportedFormats;
import org.streampipes.sdk.helpers.SupportedProtocols;
import org.streampipes.sdk.utils.Assets;
import org.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
public class InfluxDbController extends StandaloneEventSinkDeclarer<InfluxDbParameters> {
private static final String DATABASE_HOST_KEY = "db_host";
private static final String DATABASE_PORT_KEY = "db_port";
private static final String DATABASE_NAME_KEY = "db_name";
private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
private static final String DATABASE_USER_KEY = "db_user";
private static final String DATABASE_PASSWORD_KEY = "db_password";
private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
private static final String BATCH_INTERVAL_ACTIONS_KEY = "batch_interval_actions";
private static final String MAX_FLUSH_DURATION_KEY = "max_flush_duration";
@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.streampipes.sinks.databases.jvm.influxdb")
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataSinkType.STORAGE)
.requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
EpRequirements.timestampReq(),
Labels.withId(TIMESTAMP_MAPPING_KEY),
PropertyScope.NONE).build())
.supportedFormats(SupportedFormats.jsonFormat())
.supportedProtocols(SupportedProtocols.kafka(), SupportedProtocols.jms())
.requiredTextParameter(Labels.withId(DATABASE_HOST_KEY))
.requiredIntegerParameter(Labels.withId(DATABASE_PORT_KEY), 8086)
.requiredIntegerParameter(Labels.withId(BATCH_INTERVAL_ACTIONS_KEY))
.requiredIntegerParameter(Labels.withId(MAX_FLUSH_DURATION_KEY), 2000)
.requiredTextParameter(Labels.withId(DATABASE_NAME_KEY))
.requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
.requiredTextParameter(Labels.withId(DATABASE_USER_KEY))
.requiredTextParameter(Labels.withId(DATABASE_PASSWORD_KEY))
.build();
}
@Override
public ConfiguredEventSink<InfluxDbParameters> onInvocation(DataSinkInvocation graph,
DataSinkParameterExtractor extractor) {
String hostname = extractor.singleValueParameter(DATABASE_HOST_KEY, String.class);
Integer port = extractor.singleValueParameter(DATABASE_PORT_KEY, Integer.class);
String dbName = extractor.singleValueParameter(DATABASE_NAME_KEY, String.class);
String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
String user = extractor.singleValueParameter(DATABASE_USER_KEY, String.class);
String password = extractor.singleValueParameter(DATABASE_PASSWORD_KEY, String.class);
String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
Integer batch_size = extractor.singleValueParameter(BATCH_INTERVAL_ACTIONS_KEY, Integer.class);
Integer flush_duration = extractor.singleValueParameter(MAX_FLUSH_DURATION_KEY, Integer.class);
InfluxDbParameters params = new InfluxDbParameters(graph,
hostname,
port,
dbName,
measureName,
user,
password,
timestampField,
batch_size,
flush_duration);
return new ConfiguredEventSink<>(params, InfluxDb::new);
}
}