IoTDB 提供了强大的数据订阅功能,允许用户通过订阅 API 实时获取 IoTDB 新增的数据。详细的功能定义及介绍:数据订阅
本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: 全量接口说明
创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6)
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-session</artifactId> <!-- 版本号与数据库版本号相同 --> <version>${project.version}</version> </dependency> </dependencies>
import java.util.Optional; import java.util.Properties; import java.util.Set; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.SubscriptionSession; import org.apache.iotdb.session.subscription.model.Topic; public class DataConsumerExample { public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667, "root", "TimechoDB@2021", 67108864)) { //V2.0.6.x 之前默认密码为root // 1. open session session.open(); // 2. create a topic of all data Properties sessionConfig = new Properties(); sessionConfig.put(TopicConstant.PATH_KEY, "root.**"); session.createTopic("allData", sessionConfig); // 3. show all topics Set<Topic> topics = session.getTopics(); System.out.println(topics); // 4. show a specific topic Optional<Topic> allData = session.getTopic("allData"); System.out.println(allData.get()); } } }
import java.io.IOException; import java.util.List; import java.util.Properties; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; import org.apache.tsfile.read.common.RowRecord; public class DataConsumerExample { public static void main(String[] args) throws IOException { // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "TimechoDB@2021"); //V2.0.6.x 之前默认密码为root try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("topic_all"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); if (SubscriptionMessageType.isValidatedMessageType(messageType)) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); System.out.println(record); } } } } } } } }
前提:需要被消费的topic的格式为TsfileHandler类型,举例:create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')
import java.io.IOException; import java.util.List; import java.util.Properties; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; public class DataConsumerExample { public static void main(String[] args) throws IOException { // 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "TimechoDB@2021");//V2.0.6.x 之前默认密码为root consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/iotdb/Downloads"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("topic_all_tsfile"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { message.getTsFileHandler().copyFile("/Users/iotdb/Downloads/1.tsfile"); } } } } }
可通过Properties参数对象设置消费者相关参数,具体参数如下。
| 参数 | 是否必填(默认值) | 参数含义 |
|---|---|---|
| host | optional: 127.0.0.1 | String: IoTDB 中某 DataNode 的 RPC host |
| port | optional: 6667 | Integer: IoTDB 中某 DataNode 的 RPC port |
| node-urls | optional: 127.0.0.1:6667 | List<String>: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的并集构成新的 node-urls 应用 |
| username | optional: root | String: IoTDB 中 DataNode 的用户名 |
| password | optional: TimechoDB@2021 //V2.0.6.x 之前默认密码为root | String: IoTDB 中 DataNode 的密码 |
| groupId | optional | String: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 |
| consumerId | optional | String: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 |
| heartbeatIntervalMs | optional: 30000 (min: 1000) | Long: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 |
| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 |
| fileSaveDir | optional: Paths.get(System.getProperty(“user.dir”), “iotdb-subscription”).toString() | String: consumer 订阅出的 TsFile 文件临时存放的目录路径 |
| fileSaveFsync | optional: false | Boolean: consumer 订阅 TsFile 的过程中是否主动调用 fsync |
SubscriptionPushConsumer 中的特殊配置:
| 参数 | 是否必填(默认值) | 参数含义 |
|---|---|---|
| ackStrategy | optional: ACKStrategy.AFTER_CONSUME | 消费进度的确认机制包含以下选项:ACKStrategy.BEFORE_CONSUME(当 consumer 收到数据时立刻提交消费进度,onReceive 前)ACKStrategy.AFTER_CONSUME(当 consumer 消费完数据再去提交消费进度,onReceive 后) |
| consumeListener | optional | 消费数据的回调函数,需实现 ConsumeListener 接口,定义消费 SessionDataSetsHandler 和 TsFileHandler 形式数据的处理逻辑 |
| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为毫秒 |
| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为毫秒 |
SubscriptionPullConsumer 中的特殊配置:
| 参数 | 是否必填(默认值) | 参数含义 |
|---|---|---|
| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 commit 方法来手动提交消费进度 |
| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为毫秒仅当 autoCommit 参数为 true 的时候才会生效 |
| 函数名 | 说明 | 参数 |
|---|---|---|
open() | 打开消费者连接,启动消息消费。如果 autoCommit 启用,会启动自动提交工作器。 | 无 |
close() | 关闭消费者连接。如果 autoCommit 启用,会在关闭前提交所有未提交的消息。 | 无 |
poll(final Duration timeout) | 拉取消息,指定超时时间。 | timeout : 拉取的超时时间。 |
poll(final long timeoutMs) | 拉取消息,指定超时时间(毫秒)。 | timeoutMs : 超时时间,单位为毫秒。 |
poll(final Set<String> topicNames, final Duration timeout) | 拉取指定主题的消息,指定超时时间。 | topicNames : 要拉取的主题集合。timeout: 超时时间。 |
poll(final Set<String> topicNames, final long timeoutMs) | 拉取指定主题的消息,指定超时时间(毫秒)。 | topicNames : 要拉取的主题集合。timeoutMs: 超时时间,单位为毫秒。 |
commitSync(final SubscriptionMessage message) | 同步提交单条消息。 | message : 需要提交的消息对象。 |
commitSync(final Iterable<SubscriptionMessage> messages) | 同步提交多条消息。 | messages : 需要提交的消息集合。 |
commitAsync(final SubscriptionMessage message) | 异步提交单条消息。 | message : 需要提交的消息对象。 |
commitAsync(final Iterable<SubscriptionMessage> messages) | 异步提交多条消息。 | messages : 需要提交的消息集合。 |
commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback) | 异步提交单条消息并指定回调函数。 | message : 需要提交的消息对象。callback : 异步提交完成后的回调函数。 |
commitAsync(final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback callback) | 异步提交多条消息并指定回调函数。 | messages : 需要提交的消息集合。callback : 异步提交完成后的回调函数。 |
| 函数名 | 说明 | 参数 |
|---|---|---|
open() | 打开消费者连接,启动消息消费,提交自动轮询工作器。 | 无 |
close() | 关闭消费者连接,停止消息消费。 | 无 |
toString() | 返回消费者对象的核心配置信息。 | 无 |
coreReportMessage() | 获取消费者核心配置的键值对表示形式。 | 无 |
allReportMessage() | 获取消费者所有配置的键值对表示形式。 | 无 |
buildPushConsumer() | 通过 Builder 构建 SubscriptionPushConsumer 实例。 | 无 |
ackStrategy(final AckStrategy ackStrategy) | 配置消费者的消息确认策略。 | ackStrategy: 指定的消息确认策略。 |
consumeListener(final ConsumeListener consumeListener) | 配置消费者的消息消费逻辑。 | consumeListener: 消费者接收消息时的处理逻辑。 |
autoPollIntervalMs(final long autoPollIntervalMs) | 配置自动轮询的时间间隔。 | autoPollIntervalMs : 自动轮询的间隔时间,单位为毫秒。 |
autoPollTimeoutMs(final long autoPollTimeoutMs) | 配置自动轮询的超时时间。 | autoPollTimeoutMs: 自动轮询的超时时间,单位为毫秒。 |