blob: 929c6576e2ee7c534d27bc5764b71c7768d37bfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.SubscriptionMessage;
import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
import org.apache.tsfile.write.record.Tablet;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2Subscription.class})
public class IoTDBSubscriptionTopicIT extends AbstractSubscriptionDualIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionTopicIT.class);
@Test
public void testTopicPathSubscription() throws Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
session.executeNonQueryStatement(
String.format("insert into root.db.d2(time, s) values (%s, 1)", i));
session.executeNonQueryStatement(
String.format("insert into root.db.d3(time, t) values (%s, 1)", i));
session.executeNonQueryStatement(
String.format("insert into root.db.t1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Create topic on sender
final String topicName = "topic1";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
} catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
if (messages.isEmpty()) {
continue;
}
for (final SubscriptionMessage message : messages) {
final SubscriptionSessionDataSets payload =
(SubscriptionSessionDataSets) message.getPayload();
for (final Iterator<Tablet> it = payload.tabletIterator(); it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid fail
} finally {
LOGGER.info("consumer exiting...");
}
});
thread.start();
// Check data on receiver
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
new HashMap<String, String>() {
{
put("count(root.db.d1.s)", "100");
put("count(root.db.d2.s)", "100");
}
}));
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
thread.join();
}
}
@Test
public void testTopicTimeSubscription() throws Exception {
// Insert some historical data on sender
final long currentTime = System.currentTimeMillis();
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
session.executeNonQueryStatement(
String.format("insert into root.db.d2(time, s) values (%s, 1)", currentTime + i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Create topic on sender
final String topicName = "topic2";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, currentTime);
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
} catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
if (messages.isEmpty()) {
continue;
}
for (final SubscriptionMessage message : messages) {
final SubscriptionSessionDataSets payload =
(SubscriptionSessionDataSets) message.getPayload();
for (final Iterator<Tablet> it = payload.tabletIterator(); it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
});
thread.start();
// Check data on receiver
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
new HashMap<String, String>() {
{
put("count(root.db.d2.s)", "100");
}
}));
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
thread.join();
}
}
@Test
public void testTopicProcessorSubscription() throws Exception {
// Insert some history data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), (2000, 3), (2500, 4), (3000, 5)");
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Create topic
final String topicName = "topic3";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
} catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
if (messages.isEmpty()) {
continue;
}
for (final SubscriptionMessage message : messages) {
final SubscriptionSessionDataSets payload =
(SubscriptionSessionDataSets) message.getPayload();
for (final Iterator<Tablet> it = payload.tabletIterator(); it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
});
thread.start();
// Check data on receiver
final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1000,1.0,");
expectedResSet.add("2000,3.0,");
expectedResSet.add("3000,5.0,");
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select * from root.**"),
"Time,root.db.d1.at1,",
expectedResSet));
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
thread.join();
}
}
@Test
public void testTopicNameWithBackQuote() throws Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
}
for (int i = 100; i < 200; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
}
for (int i = 200; i < 300; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Create topic on sender
final String topic1 = "`topic1`";
final String topic2 = "`'topic2'`";
final String topic3 = "`\"topic3\"`";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
{
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, 0);
config.put(TopicConstant.END_TIME_KEY, 99);
session.createTopic(topic1, config);
}
{
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, 100);
config.put(TopicConstant.END_TIME_KEY, 199);
session.createTopic(topic2, config);
}
{
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, 200);
config.put(TopicConstant.END_TIME_KEY, 299);
session.createTopic(topic3, config);
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Subscribe on sender and insert on receiver
final Set<String> topics = new HashSet<>();
topics.add(topic1);
topics.add(topic2);
topics.add(topic3);
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topics);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
} catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
consumer.poll(topics, Duration.ofMillis(10000));
if (messages.isEmpty()) {
continue;
}
for (final SubscriptionMessage message : messages) {
final SubscriptionSessionDataSets payload =
(SubscriptionSessionDataSets) message.getPayload();
for (final Iterator<Tablet> it = payload.tabletIterator(); it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
consumer.unsubscribe(topics);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
});
thread.start();
// Check data on receiver
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
new HashMap<String, String>() {
{
put("count(root.db.d1.s)", "300");
}
}));
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
thread.join();
}
}
}