blob: 5d9a473c0590555306c294ac2beebf1e7ceae820 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.subscription.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.broker.SerializedEnrichedEvent;
import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHeartbeatReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribePollReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeRequestType;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeRequestVersion;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeCloseResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeCommitResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseType;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseVersion;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class SubscriptionReceiverV1 implements SubscriptionReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverV1.class);
private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
private static final TPipeSubscribeResp SUBSCRIPTION_MISSING_CUSTOMER_RESP =
new TPipeSubscribeResp(
RpcUtils.getStatus(
TSStatusCode.SUBSCRIPTION_MISSING_CUSTOMER,
"Missing consumer config, please handshake first."),
PipeSubscribeResponseVersion.VERSION_1.getVersion(),
PipeSubscribeResponseType.ACK.getType());
private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new ThreadLocal<>();
@Override
public PipeSubscribeRequestVersion getVersion() {
return PipeSubscribeRequestVersion.VERSION_1;
}
@Override
public void handleExit() {
LOGGER.info(
"Subscription: remove consumer config {} when handling exit",
consumerConfigThreadLocal.get());
consumerConfigThreadLocal.remove();
}
@Override
public final TPipeSubscribeResp handle(TPipeSubscribeReq req) {
final short reqType = req.getType();
if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
switch (PipeSubscribeRequestType.valueOf(reqType)) {
case HANDSHAKE:
return handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
case HEARTBEAT:
return handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
case SUBSCRIBE:
return handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
case UNSUBSCRIBE:
return handlePipeSubscribeUnsubscribe(
PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
case POLL:
return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
case COMMIT:
return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
case CLOSE:
return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
default:
break;
}
}
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
String.format("Unknown PipeSubscribeRequestType %s.", reqType));
LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status);
return new TPipeSubscribeResp(
status,
PipeSubscribeResponseVersion.VERSION_1.getVersion(),
PipeSubscribeResponseType.ACK.getType());
}
private TPipeSubscribeResp handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq req) {
try {
return handlePipeSubscribeHandshakeInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when handshaking: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribeHandshakeInternal(PipeSubscribeHandshakeReq req)
throws SubscriptionException {
// set consumer config thread local
ConsumerConfig existedConsumerConfig = consumerConfigThreadLocal.get();
ConsumerConfig consumerConfig = req.getConsumerConfig();
if (Objects.isNull(existedConsumerConfig)) {
consumerConfigThreadLocal.set(consumerConfig);
} else {
if (!existedConsumerConfig.equals(consumerConfig)) {
LOGGER.warn(
"Subscription: Detect stale consumer config when handshaking, stale consumer config {} will be cleared, consumer config will set to the incoming consumer config {}.",
existedConsumerConfig,
consumerConfig);
// drop stale consumer
dropConsumer(existedConsumerConfig);
consumerConfigThreadLocal.set(consumerConfig);
}
}
// create consumer if not existed
if (!SubscriptionAgent.consumer()
.isConsumerExisted(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())) {
createConsumer(consumerConfig);
} else {
LOGGER.info(
"Subscription: The consumer {} has already existed when handshaking, skip creating consumer.",
consumerConfig);
}
int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
LOGGER.info(
"Subscription: consumer {} handshake successfully, data node id: {}",
req.getConsumerConfig(),
dataNodeId);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, dataNodeId);
}
private TPipeSubscribeResp handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq req) {
try {
return handlePipeSubscribeHeartbeatInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when heartbeat: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HEARTBEAT_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal(PipeSubscribeHeartbeatReq req) {
// check consumer config thread local
ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
LOGGER.warn(
"Subscription: missing consumer config when handling PipeSubscribeHeartbeatReq: {}", req);
return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
}
// TODO: do something
LOGGER.info("Subscription: consumer {} heartbeat successfully", consumerConfig);
return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
private TPipeSubscribeResp handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq req) {
try {
return handlePipeSubscribeSubscribeInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when subscribing: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal(PipeSubscribeSubscribeReq req) {
// check consumer config thread local
ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
LOGGER.warn(
"Subscription: missing consumer config when handling PipeSubscribeSubscribeReq: {}", req);
return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
}
// subscribe topics
Set<String> topicNames = req.getTopicNames();
topicNames = topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
subscribe(consumerConfig, topicNames);
LOGGER.info("Subscription: consumer {} subscribe {} successfully", consumerConfig, topicNames);
return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
private TPipeSubscribeResp handlePipeSubscribeUnsubscribe(PipeSubscribeUnsubscribeReq req) {
try {
return handlePipeSubscribeUnsubscribeInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when unsubscribing: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribeUnsubscribeInternal(
PipeSubscribeUnsubscribeReq req) {
// check consumer config thread local
ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
LOGGER.warn(
"Subscription: missing consumer config when handling PipeSubscribeUnsubscribeReq: {}",
req);
return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
}
// unsubscribe topics
Set<String> topicNames = req.getTopicNames();
topicNames = topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
unsubscribe(consumerConfig, topicNames);
LOGGER.info(
"Subscription: consumer {} unsubscribe {} successfully", consumerConfig, topicNames);
return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
private TPipeSubscribeResp handlePipeSubscribePoll(PipeSubscribePollReq req) {
try {
return handlePipeSubscribePollInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when polling: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq req) {
// check consumer config thread local
ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
LOGGER.warn(
"Subscription: missing consumer config when handling PipeSubscribePollReq: {}", req);
return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
}
// poll
Set<String> topicNames = req.getTopicNames();
if (topicNames.isEmpty()) {
// poll all subscribed topics
topicNames =
SubscriptionAgent.consumer()
.getTopicsSubscribedByConsumer(
consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId());
} else {
topicNames = topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet());
}
SubscriptionPollTimer timer =
new SubscriptionPollTimer(
System.currentTimeMillis(),
req.getTimeoutMs() == 0
? SubscriptionConfig.getInstance().getSubscriptionDefaultPollTimeoutMs()
: Math.max(
req.getTimeoutMs(),
SubscriptionConfig.getInstance().getSubscriptionMinPollTimeoutMs()));
List<SerializedEnrichedEvent> events =
SubscriptionAgent.broker().poll(consumerConfig, topicNames, timer);
List<String> subscriptionCommitIds =
events.stream()
.map(SerializedEnrichedEvent::getSubscriptionCommitId)
.collect(Collectors.toList());
if (timer.isExpired()) {
LOGGER.warn(
"Subscription: timeout happened when consumer {} poll topics {}",
consumerConfig,
topicNames);
}
LOGGER.info(
"Subscription: consumer {} poll topics {} successfully, commit ids: {}",
consumerConfig,
topicNames,
subscriptionCommitIds);
List<Pair<ByteBuffer, EnrichedTablets>> enrichedTabletsWithByteBufferList =
events.stream()
.map(event -> new Pair<>(event.getByteBuffer(), event.getEnrichedTablets()))
.collect(Collectors.toList());
TPipeSubscribeResp resp =
PipeSubscribePollResp.toTPipeSubscribeResp(
RpcUtils.SUCCESS_STATUS, enrichedTabletsWithByteBufferList);
events.forEach(SerializedEnrichedEvent::resetByteBuffer);
return resp;
}
private TPipeSubscribeResp handlePipeSubscribeCommit(PipeSubscribeCommitReq req) {
try {
return handlePipeSubscribeCommitInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when committing: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribeCommitInternal(PipeSubscribeCommitReq req) {
// check consumer config thread local
ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
LOGGER.warn(
"Subscription: missing consumer config when handling PipeSubscribeCommitReq: {}", req);
return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
}
// commit
Map<String, List<String>> topicNameToSubscriptionCommitIds =
req.getTopicNameToSubscriptionCommitIds();
SubscriptionAgent.broker().commit(consumerConfig, topicNameToSubscriptionCommitIds);
LOGGER.info(
"Subscription: consumer commit {} successfully, commit ids: {}",
consumerConfig,
topicNameToSubscriptionCommitIds);
return PipeSubscribeCommitResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
private TPipeSubscribeResp handlePipeSubscribeClose(PipeSubscribeCloseReq req) {
try {
return handlePipeSubscribeCloseInternal(req);
} catch (SubscriptionException e) {
final String exceptionMessage =
String.format(
"Subscription: something unexpected happened when closing: %s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR, exceptionMessage));
}
}
private TPipeSubscribeResp handlePipeSubscribeCloseInternal(PipeSubscribeCloseReq req) {
// check consumer config thread local
ConsumerConfig consumerConfig = consumerConfigThreadLocal.get();
if (Objects.isNull(consumerConfig)) {
LOGGER.warn(
"Subscription: missing consumer config when handling PipeSubscribeCloseReq: {}", req);
return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
}
// unsubscribe all subscribed topics
Set<String> topics =
SubscriptionAgent.consumer()
.getTopicsSubscribedByConsumer(
consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId());
if (!topics.isEmpty()) {
LOGGER.info(
"Subscription: unsubscribe all subscribed topics {} before close consumer {}",
topics,
consumerConfig);
unsubscribe(consumerConfig, topics);
}
// drop consumer if existed
if (SubscriptionAgent.consumer()
.isConsumerExisted(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())) {
dropConsumer(consumerConfig);
} else {
LOGGER.info(
"Subscription: The consumer {} does not existed when closing, skip dropping consumer.",
consumerConfig);
}
LOGGER.info("Subscription: consumer {} close successfully", consumerConfig);
return PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
//////////////////////////// consumer operations ////////////////////////////
private void createConsumer(ConsumerConfig consumerConfig) throws SubscriptionException {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreateConsumerReq req =
new TCreateConsumerReq()
.setConsumerId(consumerConfig.getConsumerId())
.setConsumerGroupId(consumerConfig.getConsumerGroupId())
.setConsumerAttributes(consumerConfig.getAttribute());
final TSStatus tsStatus = configNodeClient.createConsumer(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
final String exceptionMessage =
String.format(
"Subscription: Failed to create consumer %s in config node, status is %s.",
consumerConfig, tsStatus);
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
} catch (ClientManagerException | TException e) {
final String exceptionMessage =
String.format(
"Subscription: Failed to create consumer %s in config node, exception is %s.",
consumerConfig, e.getMessage());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
}
private void dropConsumer(ConsumerConfig consumerConfig) throws SubscriptionException {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCloseConsumerReq req =
new TCloseConsumerReq()
.setConsumerId(consumerConfig.getConsumerId())
.setConsumerGroupId(consumerConfig.getConsumerGroupId());
final TSStatus tsStatus = configNodeClient.closeConsumer(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
final String exceptionMessage =
String.format(
"Subscription: Failed to close consumer %s in config node, status is %s.",
consumerConfig, tsStatus);
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
} catch (ClientManagerException | TException e) {
final String exceptionMessage =
String.format(
"Subscription: Failed to close consumer %s in config node, exception is %s.",
consumerConfig, e.getMessage());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
// TODO: broker TTL if no consumer in consumer group
}
private void subscribe(ConsumerConfig consumerConfig, Set<String> topicNames)
throws SubscriptionException {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSubscribeReq req =
new TSubscribeReq()
.setConsumerId(consumerConfig.getConsumerId())
.setConsumerGroupId(consumerConfig.getConsumerGroupId())
.setTopicNames(topicNames);
final TSStatus tsStatus = configNodeClient.createSubscription(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
final String exceptionMessage =
String.format(
"Subscription: Failed to subscribe topics %s for consumer %s in config node, status is %s.",
topicNames, consumerConfig, tsStatus);
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
} catch (ClientManagerException | TException e) {
final String exceptionMessage =
String.format(
"Subscription: Failed to subscribe topics %s for consumer %s in config node, exception is %s.",
topicNames, consumerConfig, e.getMessage());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
}
private void unsubscribe(ConsumerConfig consumerConfig, Set<String> topicNames)
throws SubscriptionException {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TUnsubscribeReq req =
new TUnsubscribeReq()
.setConsumerId(consumerConfig.getConsumerId())
.setConsumerGroupId(consumerConfig.getConsumerGroupId())
.setTopicNames(topicNames);
final TSStatus tsStatus = configNodeClient.dropSubscription(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
final String exceptionMessage =
String.format(
"Subscription: Failed to unsubscribe topics %s for consumer %s in config node, status is %s.",
topicNames, consumerConfig, tsStatus);
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
} catch (ClientManagerException | TException e) {
final String exceptionMessage =
String.format(
"Subscription: Failed to unsubscribe topics %s for consumer %s in config node, exception is %s.",
topicNames, consumerConfig, e.getMessage());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
}
}