blob: ccf73c359831b8d5e0b9a83e5586a68c962e7e58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session.subscription;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public abstract class SubscriptionConsumer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConsumer.class);
private static final IoTDBConnectionException NO_PROVIDERS_EXCEPTION =
new IoTDBConnectionException("Cluster has no available subscription providers to connect");
private final List<TEndPoint> initialEndpoints;
private final String username;
private final String password;
private final String consumerId;
private final String consumerGroupId;
private final long heartbeatIntervalMs;
private final long endpointsSyncIntervalMs;
private final SortedMap<Integer, SubscriptionProvider> subscriptionProviders =
new ConcurrentSkipListMap<>();
private final ReentrantReadWriteLock subscriptionProvidersLock = new ReentrantReadWriteLock(true);
private ScheduledExecutorService heartbeatWorkerExecutor;
private ScheduledExecutorService endpointsSyncerExecutor;
private ExecutorService asyncCommitExecutor;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
public String getConsumerId() {
return consumerId;
}
public String getConsumerGroupId() {
return consumerGroupId;
}
/////////////////////////////// ctor ///////////////////////////////
protected SubscriptionConsumer(Builder builder) {
this.initialEndpoints = new ArrayList<>();
// From org.apache.iotdb.session.Session.getNodeUrls
// Priority is given to `host:port` over `nodeUrls`.
if (Objects.nonNull(builder.host)) {
initialEndpoints.add(new TEndPoint(builder.host, builder.port));
} else {
initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
}
this.username = builder.username;
this.password = builder.password;
this.consumerId = builder.consumerId;
this.consumerGroupId = builder.consumerGroupId;
this.heartbeatIntervalMs = builder.heartbeatIntervalMs;
this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
}
protected SubscriptionConsumer(Builder builder, Properties properties) {
this(
builder
.host(
(String)
properties.getOrDefault(ConsumerConstant.HOST_KEY, SessionConfig.DEFAULT_HOST))
.port(
(Integer)
properties.getOrDefault(ConsumerConstant.PORT_KEY, SessionConfig.DEFAULT_PORT))
.nodeUrls((List<String>) properties.get(ConsumerConstant.NODE_URLS_KEY))
.username(
(String)
properties.getOrDefault(
ConsumerConstant.USERNAME_KEY, SessionConfig.DEFAULT_USER))
.password(
(String)
properties.getOrDefault(
ConsumerConstant.PASSWORD_KEY, SessionConfig.DEFAULT_PASSWORD))
.consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY))
.consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
.heartbeatIntervalMs(
(Long)
properties.getOrDefault(
ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE))
.endpointsSyncIntervalMs(
(Long)
properties.getOrDefault(
ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_KEY,
ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE)));
}
/////////////////////////////// open & close ///////////////////////////////
public synchronized void open()
throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
if (!isClosed.get()) {
return;
}
// open subscription providers
acquireWriteLock();
try {
openProviders(); // throw IoTDBConnectionException
} finally {
releaseWriteLock();
}
// launch heartbeat worker
launchHeartbeatWorker();
// launch endpoints syncer
launchEndpointsSyncer();
isClosed.set(false);
}
@Override
public synchronized void close() throws IoTDBConnectionException {
if (isClosed.get()) {
return;
}
try {
// shutdown endpoints syncer
shutdownEndpointsSyncer();
// shutdown workers
shutdownWorkers();
// close subscription providers
acquireWriteLock();
try {
closeProviders();
} finally {
releaseWriteLock();
}
} finally {
isClosed.set(true);
}
}
boolean isClosed() {
return isClosed.get();
}
/////////////////////////////// lock ///////////////////////////////
void acquireReadLock() {
subscriptionProvidersLock.readLock().lock();
}
void releaseReadLock() {
subscriptionProvidersLock.readLock().unlock();
}
void acquireWriteLock() {
subscriptionProvidersLock.writeLock().lock();
}
void releaseWriteLock() {
subscriptionProvidersLock.writeLock().unlock();
}
/////////////////////////////// subscribe & unsubscribe ///////////////////////////////
public void subscribe(String topicName)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
subscribe(Collections.singleton(topicName));
}
public void subscribe(String... topicNames)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
subscribe(new HashSet<>(Arrays.asList(topicNames)));
}
public void subscribe(Set<String> topicNames)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
acquireReadLock();
try {
subscribeWithRedirection(topicNames);
} finally {
releaseReadLock();
}
}
public void unsubscribe(String topicName)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
unsubscribe(Collections.singleton(topicName));
}
public void unsubscribe(String... topicNames)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
unsubscribe(new HashSet<>(Arrays.asList(topicNames)));
}
public void unsubscribe(Set<String> topicNames)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
acquireReadLock();
try {
unsubscribeWithRedirection(topicNames);
} finally {
releaseReadLock();
}
}
/////////////////////////////// heartbeat ///////////////////////////////
@SuppressWarnings("unsafeThreadSchedule")
private void launchHeartbeatWorker() {
heartbeatWorkerExecutor =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t =
new Thread(
Thread.currentThread().getThreadGroup(), r, "ConsumerHeartbeatWorker", 0);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
});
heartbeatWorkerExecutor.scheduleAtFixedRate(
new ConsumerHeartbeatWorker(this), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
}
/**
* Shut down workers upon close. There are currently two workers: heartbeat worker and async
* commit executor.
*/
private void shutdownWorkers() {
heartbeatWorkerExecutor.shutdown();
heartbeatWorkerExecutor = null;
if (asyncCommitExecutor != null) {
asyncCommitExecutor.shutdown();
asyncCommitExecutor = null;
}
}
/////////////////////////////// endpoints syncer ///////////////////////////////
@SuppressWarnings("unsafeThreadSchedule")
private void launchEndpointsSyncer() {
endpointsSyncerExecutor =
Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t =
new Thread(
Thread.currentThread().getThreadGroup(), r, "SubscriptionEndpointsSyncer", 0);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
});
endpointsSyncerExecutor.scheduleAtFixedRate(
new SubscriptionEndpointsSyncer(this), 0, endpointsSyncIntervalMs, TimeUnit.MILLISECONDS);
}
private void shutdownEndpointsSyncer() {
endpointsSyncerExecutor.shutdown();
endpointsSyncerExecutor = null;
}
/////////////////////////////// subscription provider ///////////////////////////////
SubscriptionProvider constructProvider(final TEndPoint endPoint) {
return new SubscriptionProvider(
endPoint, this.username, this.password, this.consumerId, this.consumerGroupId);
}
/** Caller should ensure that the method is called in the lock {@link #acquireWriteLock()}. */
void openProviders() throws IoTDBConnectionException {
// close stale providers
closeProviders();
for (final TEndPoint endPoint : initialEndpoints) {
final SubscriptionProvider defaultProvider;
final int defaultDataNodeId;
try {
defaultProvider = constructProvider(endPoint);
defaultDataNodeId = defaultProvider.handshake();
} catch (final Exception e) {
LOGGER.warn("Failed to create connection with {}, exception: {}", endPoint, e.getMessage());
continue; // try next endpoint
}
addProvider(defaultDataNodeId, defaultProvider);
final Map<Integer, TEndPoint> allEndPoints;
try {
allEndPoints = defaultProvider.getSessionConnection().fetchAllEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"Failed to fetch all endpoints from {}, exception: {}, will retry later...",
endPoint,
e.getMessage());
break; // retry later
}
for (final Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
if (defaultDataNodeId == entry.getKey()) {
continue;
}
final SubscriptionProvider provider;
try {
provider = constructProvider(entry.getValue());
provider.handshake();
} catch (final Exception e) {
LOGGER.warn(
"Failed to create connection with {}, exception: {}, will retry later...",
entry.getValue(),
e.getMessage());
continue; // retry later
}
addProvider(entry.getKey(), provider);
}
break;
}
if (hasNoProviders()) {
throw NO_PROVIDERS_EXCEPTION;
}
}
/////////////////////////////// poll & commit ///////////////////////////////
protected List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
throws TException, IOException, StatementExecutionException {
List<EnrichedTablets> enrichedTabletsList = new ArrayList<>();
acquireReadLock();
try {
for (final SubscriptionProvider provider : getAllAvailableProviders()) {
// TODO: network timeout
enrichedTabletsList.addAll(provider.getSessionConnection().poll(topicNames, timeoutMs));
}
} finally {
releaseReadLock();
}
return enrichedTabletsList.stream().map(SubscriptionMessage::new).collect(Collectors.toList());
}
protected void commitSync(Iterable<SubscriptionMessage> messages)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
Map<Integer, Map<String, List<String>>> dataNodeIdToTopicNameToSubscriptionCommitIds =
new HashMap<>();
for (SubscriptionMessage message : messages) {
dataNodeIdToTopicNameToSubscriptionCommitIds
.computeIfAbsent(
message.parseDataNodeIdFromSubscriptionCommitId(), (id) -> new HashMap<>())
.computeIfAbsent(message.getTopicName(), (topicName) -> new ArrayList<>())
.add(message.getSubscriptionCommitId());
}
for (Map.Entry<Integer, Map<String, List<String>>> entry :
dataNodeIdToTopicNameToSubscriptionCommitIds.entrySet()) {
commitSyncInternal(entry.getKey(), entry.getValue());
}
}
protected void commitAsync(Iterable<SubscriptionMessage> messages) {
commitAsync(messages, new AsyncCommitCallback() {});
}
protected void commitAsync(Iterable<SubscriptionMessage> messages, AsyncCommitCallback callback) {
// Initiate executor if needed
if (asyncCommitExecutor == null) {
synchronized (this) {
if (asyncCommitExecutor != null) {
return;
}
asyncCommitExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t =
new Thread(
Thread.currentThread().getThreadGroup(),
r,
"SubscriptionConsumerAsyncCommitWorker",
0);
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
});
}
}
asyncCommitExecutor.submit(new AsyncCommitWorker(messages, callback));
}
/////////////////////////////// utility ///////////////////////////////
private void commitSyncInternal(
int dataNodeId, Map<String, List<String>> topicNameToSubscriptionCommitIds)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
acquireReadLock();
try {
final SubscriptionProvider provider = getProvider(dataNodeId);
if (Objects.isNull(provider) || !provider.isAvailable()) {
throw new IoTDBConnectionException(
String.format(
"something unexpected happened when commit messages to subscription provider with data node id %s, the subscription provider may be unavailable or not existed",
dataNodeId));
}
provider.getSessionConnection().commitSync(topicNameToSubscriptionCommitIds);
} finally {
releaseReadLock();
}
}
/** Caller should ensure that the method is called in the lock {@link #acquireWriteLock()}. */
private void closeProviders() throws IoTDBConnectionException {
for (final SubscriptionProvider provider : getAllProviders()) {
provider.close();
}
subscriptionProviders.clear();
}
/** Caller should ensure that the method is called in the lock {@link #acquireWriteLock()}. */
void addProvider(final int dataNodeId, final SubscriptionProvider provider) {
// the subscription provider is opened
LOGGER.info("add new subscription provider {}", provider);
subscriptionProviders.put(dataNodeId, provider);
}
/** Caller should ensure that the method is called in the lock {@link #acquireWriteLock()}. */
void closeAndRemoveProvider(final int dataNodeId) throws IoTDBConnectionException {
if (!containsProvider(dataNodeId)) {
return;
}
final SubscriptionProvider provider = subscriptionProviders.get(dataNodeId);
try {
provider.close();
} finally {
LOGGER.info("close and remove stale subscription provider {}", provider);
subscriptionProviders.remove(dataNodeId);
}
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
boolean hasNoProviders() {
return subscriptionProviders.isEmpty();
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
boolean containsProvider(final int dataNodeId) {
return subscriptionProviders.containsKey(dataNodeId);
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
List<SubscriptionProvider> getAllAvailableProviders() {
return subscriptionProviders.values().stream()
.filter(SubscriptionProvider::isAvailable)
.collect(Collectors.toList());
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
List<SubscriptionProvider> getAllProviders() {
return new ArrayList<>(subscriptionProviders.values());
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
SubscriptionProvider getProvider(final int dataNodeId) {
return containsProvider(dataNodeId) ? subscriptionProviders.get(dataNodeId) : null;
}
/////////////////////////////// redirection ///////////////////////////////
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
public void subscribeWithRedirection(final Set<String> topicNames)
throws IoTDBConnectionException {
for (final SubscriptionProvider provider : getAllAvailableProviders()) {
try {
provider.getSessionConnection().subscribe(topicNames);
return;
} catch (final Exception e) {
LOGGER.warn(
"Failed to subscribe topics {} from subscription provider {}, exception: {}, try next subscription provider...",
topicNames,
provider,
e.getMessage());
}
}
throw NO_PROVIDERS_EXCEPTION;
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
public void unsubscribeWithRedirection(final Set<String> topicNames)
throws IoTDBConnectionException {
for (final SubscriptionProvider provider : getAllAvailableProviders()) {
try {
provider.getSessionConnection().unsubscribe(topicNames);
return;
} catch (final Exception e) {
LOGGER.warn(
"Failed to unsubscribe topics {} from subscription provider {}, exception: {}, try next subscription provider...",
topicNames,
provider,
e.getMessage());
}
}
throw NO_PROVIDERS_EXCEPTION;
}
/** Caller should ensure that the method is called in the lock {@link #acquireReadLock()}. */
public Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection()
throws IoTDBConnectionException {
Map<Integer, TEndPoint> endPoints = null;
for (final SubscriptionProvider provider : getAllAvailableProviders()) {
try {
endPoints = provider.getSessionConnection().fetchAllEndPoints();
break;
} catch (final Exception e) {
LOGGER.warn(
"Failed to fetch all endpoints from subscription provider {}, exception: {}, try next subscription provider...",
provider,
e.getMessage());
}
}
if (Objects.isNull(endPoints)) {
throw NO_PROVIDERS_EXCEPTION;
}
return endPoints;
}
/////////////////////////////// builder ///////////////////////////////
public abstract static class Builder {
protected String host = SessionConfig.DEFAULT_HOST;
protected int port = SessionConfig.DEFAULT_PORT;
protected List<String> nodeUrls = null;
protected String username = SessionConfig.DEFAULT_USER;
protected String password = SessionConfig.DEFAULT_PASSWORD;
protected String consumerId;
protected String consumerGroupId;
protected long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE;
protected long endpointsSyncIntervalMs =
ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE;
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder nodeUrls(List<String> nodeUrls) {
this.nodeUrls = nodeUrls;
return this;
}
public Builder username(String username) {
this.username = username;
return this;
}
public Builder password(String password) {
this.password = password;
return this;
}
public Builder consumerId(String consumerId) {
this.consumerId = consumerId;
return this;
}
public Builder consumerGroupId(String consumerGroupId) {
this.consumerGroupId = consumerGroupId;
return this;
}
public Builder heartbeatIntervalMs(long heartbeatIntervalMs) {
this.heartbeatIntervalMs =
Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE);
return this;
}
public Builder endpointsSyncIntervalMs(long endpointsSyncIntervalMs) {
this.endpointsSyncIntervalMs =
Math.max(endpointsSyncIntervalMs, ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE);
return this;
}
public abstract SubscriptionPullConsumer buildPullConsumer();
public abstract SubscriptionPushConsumer buildPushConsumer();
}
class AsyncCommitWorker implements Runnable {
private final Iterable<SubscriptionMessage> messages;
private final AsyncCommitCallback callback;
public AsyncCommitWorker(Iterable<SubscriptionMessage> messages, AsyncCommitCallback callback) {
this.messages = messages;
this.callback = callback;
}
@Override
public void run() {
if (isClosed()) {
return;
}
try {
commitSync(messages);
callback.onComplete();
} catch (Exception e) {
callback.onFailure(e);
}
}
}
}