| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.influxdb; |
| |
| import org.apache.iotdb.influxdb.protocol.constant.InfluxDBConstant; |
| import org.apache.iotdb.influxdb.protocol.dto.SessionPoint; |
| import org.apache.iotdb.influxdb.protocol.impl.IoTDBInfluxDBService; |
| import org.apache.iotdb.influxdb.protocol.util.ParameterUtils; |
| import org.apache.iotdb.session.Session; |
| |
| import org.influxdb.BatchOptions; |
| import org.influxdb.InfluxDB; |
| import org.influxdb.dto.BatchPoints; |
| import org.influxdb.dto.Point; |
| import org.influxdb.dto.Pong; |
| import org.influxdb.dto.Query; |
| import org.influxdb.dto.QueryResult; |
| import org.influxdb.impl.TimeUtil; |
| |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.List; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| |
| public class IoTDBInfluxDB implements InfluxDB { |
| |
| private final IoTDBInfluxDBService influxDBService; |
| |
| public IoTDBInfluxDB(String url, String userName, String password) { |
| URI uri; |
| try { |
| uri = new URI(url); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Unable to parse url: " + url, e); |
| } |
| influxDBService = new IoTDBInfluxDBService(uri.getHost(), uri.getPort(), userName, password); |
| } |
| |
| public IoTDBInfluxDB(String host, int rpcPort, String userName, String password) { |
| influxDBService = new IoTDBInfluxDBService(host, rpcPort, userName, password); |
| } |
| |
| public IoTDBInfluxDB(Session.Builder builder) { |
| this(builder.build()); |
| } |
| |
| public IoTDBInfluxDB(Session session) { |
| SessionPoint sessionPoint = new SessionPoint(session); |
| influxDBService = |
| new IoTDBInfluxDBService( |
| sessionPoint.getHost(), |
| sessionPoint.getRpcPort(), |
| sessionPoint.getUsername(), |
| sessionPoint.getPassword()); |
| } |
| |
| @Override |
| public void write(final Point point) { |
| write(null, null, point); |
| } |
| |
| @Override |
| public void write(final String database, final String retentionPolicy, final Point point) { |
| TimeUnit precision = TimeUnit.NANOSECONDS; |
| // Get the precision of point in influxdb by reflection |
| for (java.lang.reflect.Field reflectField : point.getClass().getDeclaredFields()) { |
| reflectField.setAccessible(true); |
| try { |
| if (reflectField.getType().getName().equalsIgnoreCase("java.util.concurrent.TimeUnit") |
| && reflectField.getName().equalsIgnoreCase("precision")) { |
| precision = (TimeUnit) reflectField.get(point); |
| break; |
| } |
| } catch (IllegalAccessException e) { |
| throw new IllegalArgumentException(e.getMessage()); |
| } |
| } |
| BatchPoints batchPoints = |
| BatchPoints.database(database) |
| .retentionPolicy(retentionPolicy) |
| .precision(precision) |
| .build(); |
| batchPoints.point(point); |
| write(batchPoints); |
| } |
| |
| @Override |
| public void write(final int udpPort, final Point point) { |
| write(null, null, point); |
| } |
| |
| @Override |
| public void write(final BatchPoints batchPoints) { |
| influxDBService.writePoints( |
| batchPoints.getDatabase(), |
| batchPoints.getRetentionPolicy(), |
| TimeUtil.toTimePrecision(batchPoints.getPrecision()), |
| batchPoints.getConsistency().value(), |
| batchPoints.lineProtocol()); |
| } |
| |
| @Override |
| public void writeWithRetry(final BatchPoints batchPoints) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void write(final String records) { |
| write(null, null, null, null, records); |
| } |
| |
| @Override |
| public void write(final List<String> records) { |
| write(String.join("\n", records)); |
| } |
| |
| @Override |
| public void write( |
| final String database, |
| final String retentionPolicy, |
| final ConsistencyLevel consistency, |
| final String records) { |
| write(database, retentionPolicy, consistency, null, records); |
| } |
| |
| @Override |
| public void write( |
| final String database, |
| final String retentionPolicy, |
| final ConsistencyLevel consistency, |
| final TimeUnit precision, |
| final String records) { |
| influxDBService.writePoints( |
| database, |
| retentionPolicy, |
| consistency.value(), |
| precision == null ? "" : TimeUtil.toTimePrecision(precision), |
| records); |
| } |
| |
| @Override |
| public void write( |
| final String database, |
| final String retentionPolicy, |
| final ConsistencyLevel consistency, |
| final List<String> records) { |
| write(database, retentionPolicy, consistency, null, String.join("\n", records)); |
| } |
| |
| @Override |
| public void write( |
| final String database, |
| final String retentionPolicy, |
| final ConsistencyLevel consistency, |
| final TimeUnit precision, |
| final List<String> records) { |
| write(database, retentionPolicy, consistency, precision, String.join("\n", records)); |
| } |
| |
| @Override |
| public void write(final int udpPort, final String records) { |
| write(records); |
| } |
| |
| @Override |
| public void write(final int udpPort, final List<String> records) { |
| write(String.join("\n", records)); |
| } |
| |
| @Override |
| public QueryResult query(final Query queryReq) { |
| return influxDBService.query(queryReq); |
| } |
| |
| @Override |
| public void createDatabase(final String name) { |
| ParameterUtils.checkNonEmptyString(name, "database name"); |
| influxDBService.createDatabase(name); |
| } |
| |
| @Override |
| public void deleteDatabase(final String name) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB setDatabase(final String database) { |
| ParameterUtils.checkNonEmptyString(database, "database name"); |
| influxDBService.setDatabase(database); |
| return this; |
| } |
| |
| @Override |
| public void query( |
| final Query query, |
| final Consumer<QueryResult> onSuccess, |
| final Consumer<Throwable> onFailure) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void query(Query query, int chunkSize, Consumer<QueryResult> onNext) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void query(Query query, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void query(Query query, int chunkSize, Consumer<QueryResult> onNext, Runnable onComplete) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void query( |
| Query query, |
| int chunkSize, |
| BiConsumer<Cancellable, QueryResult> onNext, |
| Runnable onComplete) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void query( |
| Query query, |
| int chunkSize, |
| BiConsumer<Cancellable, QueryResult> onNext, |
| Runnable onComplete, |
| Consumer<Throwable> onFailure) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public QueryResult query(Query query, TimeUnit timeUnit) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public List<String> describeDatabases() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public boolean databaseExists(final String name) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void flush() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void close() { |
| influxDBService.close(); |
| } |
| |
| @Override |
| public InfluxDB setConsistency(final ConsistencyLevel consistencyLevel) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB setRetentionPolicy(final String retentionPolicy) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void createRetentionPolicy( |
| final String rpName, |
| final String database, |
| final String duration, |
| final String shardDuration, |
| final int replicationFactor, |
| final boolean isDefault) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void createRetentionPolicy( |
| final String rpName, |
| final String database, |
| final String duration, |
| final int replicationFactor, |
| final boolean isDefault) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void createRetentionPolicy( |
| final String rpName, |
| final String database, |
| final String duration, |
| final String shardDuration, |
| final int replicationFactor) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void dropRetentionPolicy(final String rpName, final String database) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB setLogLevel(LogLevel logLevel) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableGzip() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB disableGzip() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public boolean isGzipEnabled() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableBatch() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableBatch(BatchOptions batchOptions) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableBatch( |
| final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableBatch( |
| final int actions, |
| final int flushDuration, |
| final TimeUnit flushDurationTimeUnit, |
| final ThreadFactory threadFactory) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableBatch( |
| int actions, |
| int flushDuration, |
| TimeUnit flushDurationTimeUnit, |
| ThreadFactory threadFactory, |
| BiConsumer<Iterable<Point>, Throwable> exceptionHandler, |
| ConsistencyLevel consistency) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public InfluxDB enableBatch( |
| final int actions, |
| final int flushDuration, |
| final TimeUnit flushDurationTimeUnit, |
| final ThreadFactory threadFactory, |
| final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public void disableBatch() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public boolean isBatchEnabled() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public Pong ping() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| |
| @Override |
| public String version() { |
| throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED); |
| } |
| } |