blob: b1b92379f0cc9accf856c1cf4f36fcc7aa8c1ff1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.subscription.agent;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.SubscriptionException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.stream.Collectors;
class SubscriptionAgentLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionAgentLauncher.class);
// For fail-over
public static final int MAX_RETRY_TIMES = 5;
private SubscriptionAgentLauncher() {
// Forbidding instantiation
}
public static synchronized void launchSubscriptionTopicAgent() throws StartupException {
int retry = 0;
while (retry < MAX_RETRY_TIMES) {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetAllTopicInfoResp getAllTopicInfoResp = configNodeClient.getAllTopicInfo();
if (getAllTopicInfoResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final String exceptionMessage =
String.format(
"Failed to get all topic info in config node, status is %s",
getAllTopicInfoResp.getStatus());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
final TPushTopicMetaRespExceptionMessage exceptionMessage =
SubscriptionAgent.topic()
.handleTopicMetaChanges(
getAllTopicInfoResp.getAllTopicInfo().stream()
.map(
byteBuffer -> {
final TopicMeta topicMeta = TopicMeta.deserialize(byteBuffer);
LOGGER.info(
"Pulled topic meta from config node: {}, recovering ...",
topicMeta);
return topicMeta;
})
.collect(Collectors.toList()));
if (Objects.nonNull(exceptionMessage)) {
LOGGER.warn(exceptionMessage.getMessage());
throw new SubscriptionException(exceptionMessage.getMessage());
}
return;
} catch (SubscriptionException | ClientManagerException | TException e) {
retry++;
LOGGER.warn(
"Failed to get topic meta from config node for {} times, will retry at most {} times.",
retry,
MAX_RETRY_TIMES,
e);
try {
Thread.sleep(
retry * SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
} catch (InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to get topic meta from config node.",
interruptedException);
Thread.currentThread().interrupt();
}
}
}
throw new StartupException("Failed to get topic meta from config node.");
}
public static synchronized void launchSubscriptionConsumerAgent() throws StartupException {
int retry = 0;
while (retry < MAX_RETRY_TIMES) {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetAllSubscriptionInfoResp getAllSubscriptionInfoResp =
configNodeClient.getAllSubscriptionInfo();
if (getAllSubscriptionInfoResp.getStatus().getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final String exceptionMessage =
String.format(
"Failed to get all subscription info in config node, status is %s",
getAllSubscriptionInfoResp.getStatus());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
final TPushConsumerGroupMetaRespExceptionMessage exceptionMessage =
SubscriptionAgent.consumer()
.handleConsumerGroupMetaChanges(
getAllSubscriptionInfoResp.getAllSubscriptionInfo().stream()
.map(
byteBuffer -> {
final ConsumerGroupMeta consumerGroupMeta =
ConsumerGroupMeta.deserialize(byteBuffer);
LOGGER.info(
"Pulled consumer group meta from config node: {}, recovering ...",
consumerGroupMeta);
return consumerGroupMeta;
})
.collect(Collectors.toList()));
if (Objects.nonNull(exceptionMessage)) {
LOGGER.warn(exceptionMessage.getMessage());
throw new SubscriptionException(exceptionMessage.getMessage());
}
return;
} catch (SubscriptionException | ClientManagerException | TException e) {
retry++;
LOGGER.warn(
"Failed to get consumer group meta from config node for {} times, will retry at most {} times.",
retry,
MAX_RETRY_TIMES,
e);
try {
Thread.sleep(
retry * SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
} catch (InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to get consumer group meta from config node.",
interruptedException);
Thread.currentThread().interrupt();
}
}
}
throw new StartupException("Failed to get consumer group meta from config node.");
}
}