IoTDB provides powerful data subscription functionality, allowing users to access newly added data from IoTDB in real-time through subscription APIs. For detailed functional definitions and introductions:Data subscription
This section is used to illustrate the core development process and does not demonstrate all parameters and interfaces. For a comprehensive understanding of all features and parameters, please refer to: Java Native API
Create a Maven project and import the following dependencies(JDK >= 1.8, Maven >= 3.6)
<dependencies> <dependency> <groupId>org.apache.iotdb</groupId> <artifactId>iotdb-session</artifactId> <!-- The version number is the same as the database version number --> <version>${project.version}</version> </dependency> </dependencies>
import java.util.Optional; import java.util.Properties; import java.util.Set; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.SubscriptionSession; import org.apache.iotdb.session.subscription.model.Topic; public class DataConsumerExample { public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667, "root", "root", 67108864)) { // 1. open session session.open(); // 2. create a topic of all data Properties sessionConfig = new Properties(); sessionConfig.put(TopicConstant.PATH_KEY, "root.**"); session.createTopic("allData", sessionConfig); // 3. show all topics Set<Topic> topics = session.getTopics(); System.out.println(topics); // 4. show a specific topic Optional<Topic> allData = session.getTopic("allData"); System.out.println(allData.get()); } } }
import java.io.IOException; import java.util.List; import java.util.Properties; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; import org.apache.tsfile.read.common.RowRecord; public class DataConsumerExample { public static void main(String[] args) throws IOException { // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("topic_all"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); if (SubscriptionMessageType.isValidatedMessageType(messageType)) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); System.out.println(record); } } } } } } } }
Prerequisite: The format of the topic to be consumed must be of the TsfileHandler type. For example:create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')
import java.io.IOException; import java.util.List; import java.util.Properties; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; public class DataConsumerExample { public static void main(String[] args) throws IOException { // 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); // 2. Specify the consumption type as the tsfile type consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root"); consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/iotdb/Downloads"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("topic_all_tsfile"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { message.getTsFileHandler().copyFile("/Users/iotdb/Downloads/1.tsfile"); } } } } }
The consumer-related parameters can be set through the Properties parameter object. The specific parameters are as follows:
| Parameter | required or optional with default | Parameter Meaning |
|---|---|---|
| host | optional: 127.0.0.1 | String: The RPC host of a DataNode in IoTDB |
| port | optional: 6667 | Integer: The RPC port of a DataNode in IoTDB |
| node-urls | optional: 127.0.0.1:6667 | List<String>: The RPC addresses of all DataNodes in IoTDB, which can be multiple; either host:port or node-urls can be filled. If both host:port and node-urls are filled, the union of host:port and node-urls will be taken to form a new node-urls for application |
| username | optional: root | String: The username of the DataNode in IoTDB |
| password | optional: root | String: The password of the DataNode in IoTDB |
| groupId | optional | String: consumer group id,if not specified, it will be randomly assigned (a new consumer group),ensuring that the consumer group id of different consumer groups are all different |
| consumerId | optional | String: consumer client id,if not specified, it will be randomly assigned,ensuring that each consumer client id in the same consumer group is different |
| heartbeatIntervalMs | optional: 30000 (min: 1000) | Long: The interval at which the consumer sends periodic heartbeat requests to the IoTDB DataNode |
| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long: The interval at which the consumer detects the expansion or contraction of IoTDB cluster nodes and adjusts the subscription connection |
| fileSaveDir | optional: Paths.get(System.getProperty(“user.dir”), “iotdb-subscription”).toString() | String: The temporary directory path where the consumer stores the subscribed TsFile files |
| fileSaveFsync | optional: false | Boolean: Whether the consumer actively calls fsync during the subscription of TsFiles |
Special configurations in SubscriptionPushConsumer :
| Parameter | required or optional with default | Parameter Meaning |
|---|---|---|
| ackStrategy | optional: ACKStrategy.AFTER_CONSUME | The acknowledgment mechanism for consumption progress includes the following options: ACKStrategy.BEFORE_CONSUME(the consumer submits the consumption progress immediately upon receiving the data, before onReceive )ACKStrategy.AFTER_CONSUME(the consumer submits the consumption progress after consuming the data, after onReceive ) |
| consumeListener | optional | The callback function for consuming data, which needs to implement the ConsumeListener interface, defining the processing logic for consuming SessionDataSetsHandler and TsFileHandler formatted data |
| autoPollIntervalMs | optional: 5000 (min: 500) | Long: The time interval at which the consumer automatically pulls data, in ms |
| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: The timeout duration for the consumer to pull data each time, in ms |
Special configurations in SubscriptionPullConsumer :
| Parameter | required or optional with default | Parameter Meaning |
|---|---|---|
| autoCommit | optional: true | Boolean: Whether to automatically commit the consumption progress. If this parameter is set to false, the commit method needs to be called manually to submit the consumption progress |
| autoCommitInterval | optional: 5000 (min: 500) | Long: The time interval for automatically committing the consumption progress, in ms .This parameter only takes effect when the autoCommit parameter is set to true |
| Function name | Description | Parameter |
|---|---|---|
open() | Opens the consumer connection and starts message consumption. If autoCommit is enabled, it will start the automatic commit worker. | None |
close() | Closes the consumer connection. If autoCommit is enabled, it will commit all uncommitted messages before closing. | None |
poll(final Duration timeout) | Pulls messages with a specified timeout. | timeout : The timeout duration. |
poll(final long timeoutMs) | Pulls messages with a specified timeout in milliseconds. | timeoutMs : The timeout duration in milliseconds. |
poll(final Set<String> topicNames, final Duration timeout) | Pulls messages from specified topics with a specified timeout. | topicNames : The set of topics to pull messages from. timeout: The timeout duration。 |
poll(final Set<String> topicNames, final long timeoutMs) | Pulls messages from specified topics with a specified timeout in milliseconds. | topicNames : The set of topics to pull messages from.timeoutMs: The timeout duration in milliseconds. |
commitSync(final SubscriptionMessage message) | Synchronously commits a single message. | message : The message object to be committed. |
commitSync(final Iterable<SubscriptionMessage> messages) | Synchronously commits multiple messages. | messages : The collection of message objects to be committed. |
commitAsync(final SubscriptionMessage message) | Asynchronously commits a single message. | message : The message object to be committed. |
commitAsync(final Iterable<SubscriptionMessage> messages) | Asynchronously commits multiple messages. | messages : The collection of message objects to be committed. |
commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback) | Asynchronously commits a single message with a specified callback. | message : The message object to be committed. callback : The callback function to be executed after asynchronous commit. |
commitAsync(final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback callback) | Asynchronously commits multiple messages with a specified callback. | messages : The collection of message objects to be committed.callback : The callback function to be executed after asynchronous commit. |
| Function name | Description | Parameter |
|---|---|---|
open() | Opens the consumer connection, starts message consumption, and submits the automatic polling worker. | None |
close() | Closes the consumer connection and stops message consumption. | None |
toString() | Returns the core configuration information of the consumer object. | None |
coreReportMessage() | Obtains the key-value representation of the consumer's core configuration. | None |
allReportMessage() | Obtains the key-value representation of all the consumer's configurations. | None |
buildPushConsumer() | Builds a SubscriptionPushConsumer instance through the Builder | None |
ackStrategy(final AckStrategy ackStrategy) | Configures the message acknowledgment strategy for the consumer. | ackStrategy: The specified message acknowledgment strategy. |
consumeListener(final ConsumeListener consumeListener) | Configures the message consumption logic for the consumer. | consumeListener: The processing logic when the consumer receives messages. |
autoPollIntervalMs(final long autoPollIntervalMs) | Configures the interval for automatic polling. | autoPollIntervalMs : The interval for automatic polling, in milliseconds. |
autoPollTimeoutMs(final long autoPollTimeoutMs) | Configures the timeout for automatic polling.间。 | autoPollTimeoutMs: The timeout for automatic polling, in milliseconds. |