blob: 3473752a6693f75e493a9623f7ea54b8f4b555fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.integration;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.trigger.sink.forward.ForwardEvent;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.sun.net.httpserver.HttpServer;
import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import org.apache.http.HttpStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;
public class IoTDBTriggerForwardIT {
private volatile long count = 0;
private volatile Exception exception = null;
private HttpServer httpServer;
private Server mqttServer;
private final Gson gson = new Gson();
private final AtomicLong resultCount = new AtomicLong(0);
private final Thread dataGenerator =
new Thread() {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
do {
++count;
statement.execute(
String.format(
"insert into root.vehicle.a.b.c.d1(timestamp,s1,s2,s3,s4,s5,s6) values(%d,%d,%d,%d,%d,%s,'%d')",
count, count, count, count, count, count % 2 == 0 ? "true" : "false", count));
} while (!isInterrupted());
} catch (Exception e) {
exception = e;
}
}
};
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
createTimeseries();
Class.forName(Config.JDBC_DRIVER_NAME);
}
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
}
@Test
public void testForwardHTTPTrigger() throws InterruptedException {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
startHTTPService();
statement.execute(
"create trigger trigger_forward_http_before before insert on root.vehicle.a.b.c.d1.s1 "
+ "as 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' "
+ "with ('protocol' = 'http', 'endpoint' = 'http://127.0.0.1:8080/')");
statement.execute(
"create trigger trigger_forward_http_after after insert on root.vehicle.a.b.c.d1.s2 "
+ "as 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' "
+ "with ('protocol' = 'http', 'endpoint' = 'http://127.0.0.1:8080/')");
startDataGenerator();
waitCountIncreaseBy(500);
stopDataGenerator();
// ensure no exception occurs when inserting data
if (exception != null) {
fail(exception.getMessage());
}
await().atMost(1, TimeUnit.MINUTES).until(() -> 2 * count == resultCount.get());
if (exception != null) {
fail(exception.getMessage());
}
} catch (Exception e) {
fail(e.getMessage());
} finally {
if (httpServer != null) {
httpServer.stop(0);
}
stopDataGenerator();
}
}
@Test
public void testForwardMQTTTrigger() throws InterruptedException {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
startMQTTService();
statement.execute(
"create trigger trigger_forward_mqtt_before before insert on root.vehicle.a.b.c.d1.s3 "
+ "as 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' "
+ "with ('protocol' = 'mqtt', 'host' = '127.0.0.1', 'port' = '1884',"
+ " 'username' = 'root', 'password' = 'root', 'topic' = 'mqtt-test')");
statement.execute(
"create trigger trigger_forward_mqtt_after after insert on root.vehicle.a.b.c.d1.s4 "
+ "as 'org.apache.iotdb.db.engine.trigger.builtin.ForwardTrigger' "
+ "with ('protocol' = 'mqtt', 'host' = '127.0.0.1', 'port' = '1884',"
+ " 'username' = 'root', 'password' = 'root', 'topic' = 'mqtt-test')");
startDataGenerator();
waitCountIncreaseBy(500);
stopDataGenerator();
// ensure no exception occurs when inserting data
if (exception != null) {
fail(exception.getMessage());
}
await().atMost(1, TimeUnit.MINUTES).until(() -> 2 * count == resultCount.get());
if (exception != null) {
fail(exception.getMessage());
}
} catch (SQLException | InterruptedException | IOException e) {
fail(e.getMessage());
} finally {
stopDataGenerator();
if (mqttServer != null) {
mqttServer.stopServer();
}
}
}
private void createTimeseries() throws MetadataException {
IoTDB.schemaProcessor.createTimeseries(
new PartialPath("root.vehicle.a.b.c.d1.s1"),
TSDataType.INT32,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
null);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath("root.vehicle.a.b.c.d1.s2"),
TSDataType.INT64,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
null);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath("root.vehicle.a.b.c.d1.s3"),
TSDataType.FLOAT,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
null);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath("root.vehicle.a.b.c.d1.s4"),
TSDataType.DOUBLE,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
null);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath("root.vehicle.a.b.c.d1.s5"),
TSDataType.BOOLEAN,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
null);
IoTDB.schemaProcessor.createTimeseries(
new PartialPath("root.vehicle.a.b.c.d1.s6"),
TSDataType.TEXT,
TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED,
null);
}
private void startDataGenerator() {
dataGenerator.start();
}
private void stopDataGenerator() throws InterruptedException {
if (!dataGenerator.isInterrupted()) {
dataGenerator.interrupt();
}
dataGenerator.join();
}
private void waitCountIncreaseBy(final long increment) throws InterruptedException {
final long previous = count;
while (count - previous < increment) {
Thread.sleep(100);
}
}
private void startHTTPService() throws IOException {
httpServer = HttpServer.create(new InetSocketAddress(8080), 0);
httpServer.createContext(
"/",
exchange -> {
String entity = "";
try {
InputStream in = exchange.getRequestBody();
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] b = new byte[1024 * 8];
int len;
while ((len = in.read(b)) != -1) {
out.write(b, 0, len);
}
entity = out.toString();
} catch (Exception e) {
e.printStackTrace();
}
if (!checkPayload(entity)) {
exception = new IOException("HTTP forward payload error");
}
JsonArray receiveData = gson.fromJson(entity, JsonArray.class);
resultCount.addAndGet(receiveData.size());
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
});
httpServer.start();
}
private void startMQTTService() throws IOException {
Properties properties = new Properties();
properties.setProperty(BrokerConstants.HOST_PROPERTY_NAME, "0.0.0.0");
properties.setProperty(BrokerConstants.PORT_PROPERTY_NAME, "1884");
properties.setProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, "1");
IConfig config = new MemoryConfig(properties);
List<InterceptHandler> handlers = Lists.newArrayList(new ForwardTestHandler());
mqttServer = new Server();
mqttServer.startServer(config, handlers);
}
private class ForwardTestHandler extends AbstractInterceptHandler {
@Override
public String getID() {
return "forward-test-handler";
}
@Override
public void onPublish(InterceptPublishMessage msg) {
String payload = msg.getPayload().toString(StandardCharsets.UTF_8);
if (!checkPayload(payload)) {
exception = new IOException("MQTT forward payload error");
}
JsonArray receiveData = gson.fromJson(payload, JsonArray.class);
resultCount.addAndGet(receiveData.size());
}
}
private boolean checkPayload(String payload) {
return payload.matches(ForwardEvent.PAYLOADS_FORMATTER_REGEX);
}
}