blob: e73774e59b5f64d9cdd815190091dd4b9ef21adf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Status;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import net.javacrumbs.futureconverter.java8guava.FutureConverter;
import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.impl.Settings;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@link SimpleConsumer}
*/
@SuppressWarnings("UnstableApiUsage")
class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerImpl.class);
private final SimpleSubscriptionSettings simpleSubscriptionSettings;
private final String consumerGroup;
private final Duration awaitDuration;
private final AtomicInteger topicIndex;
private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subscriptionRouteDataCache;
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;
this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
this.subscriptionExpressions = subscriptionExpressions;
this.subscriptionRouteDataCache = new ConcurrentHashMap<>();
}
@Override
protected void startUp() throws Exception {
try {
log.info("Begin to start the rocketmq simple consumer, clientId={}", clientId);
super.startUp();
log.info("The rocketmq simple consumer starts successfully, clientId={}", clientId);
} catch (Throwable t) {
log.error("Failed to start the rocketmq simple consumer, try to shutdown it, clientId={}", clientId, t);
shutDown();
throw t;
}
}
@Override
protected void shutDown() throws InterruptedException {
log.info("Begin to shutdown the rocketmq simple consumer, clientId={}", clientId);
super.shutDown();
log.info("Shutdown the rocketmq simple consumer successfully, clientId={}", clientId);
}
/**
* @see SimpleConsumer#getConsumerGroup()
*/
@Override
public String getConsumerGroup() {
return consumerGroup;
}
/**
* @see SimpleConsumer#subscribe(String, FilterExpression)
*/
@Override
public SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
// Check consumer status.
if (!this.isRunning()) {
log.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}",
this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running now");
}
final ListenableFuture<TopicRouteData> future = getRouteData(topic);
handleClientFuture(future);
subscriptionExpressions.put(topic, filterExpression);
return this;
}
/**
* @see SimpleConsumer#unsubscribe(String)
*/
@Override
public SimpleConsumer unsubscribe(String topic) {
// Check consumer status.
if (!this.isRunning()) {
log.error("Unable to remove subscription because simple consumer is not running, state={}, "
+ "clientId={}", this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running now");
}
subscriptionExpressions.remove(topic);
return this;
}
/**
* @see SimpleConsumer#getSubscriptionExpressions()
*/
@Override
public Map<String, FilterExpression> getSubscriptionExpressions() {
return new HashMap<>(subscriptionExpressions);
}
@Override
public HeartbeatRequest wrapHeartbeatRequest() {
return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup())
.setClientType(ClientType.SIMPLE_CONSUMER).build();
}
/**
* @see SimpleConsumer#receive(int, Duration)
*/
@Override
public List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException {
final ListenableFuture<List<MessageView>> future = receive0(maxMessageNum, invisibleDuration);
return handleClientFuture(future);
}
/**
* @see SimpleConsumer#receiveAsync(int, Duration)
*/
@Override
public CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) {
final ListenableFuture<List<MessageView>> future = receive0(maxMessageNum, invisibleDuration);
return FutureConverter.toCompletableFuture(future);
}
public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
if (!this.isRunning()) {
log.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
return Futures.immediateFailedFuture(e);
}
if (maxMessageNum <= 0) {
final IllegalArgumentException e = new IllegalArgumentException("maxMessageNum must be greater than 0");
return Futures.immediateFailedFuture(e);
}
final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);
final ArrayList<String> topics = new ArrayList<>(copy.keySet());
// All topic is subscribed.
if (topics.isEmpty()) {
final IllegalArgumentException e = new IllegalArgumentException("There is no topic to receive message");
return Futures.immediateFailedFuture(e);
}
final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
final FilterExpression filterExpression = copy.get(topic);
final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
invisibleDuration, awaitDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
clientCallbackExecutor);
}
/**
* @see SimpleConsumer#ack(MessageView)
*/
@Override
public void ack(MessageView messageView) throws ClientException {
final ListenableFuture<Void> future = ack0(messageView);
handleClientFuture(future);
}
/**
* @see SimpleConsumer#ackAsync(MessageView)
*/
@Override
public CompletableFuture<Void> ackAsync(MessageView messageView) {
final ListenableFuture<Void> future = ack0(messageView);
return FutureConverter.toCompletableFuture(future);
}
private ListenableFuture<Void> ack0(MessageView messageView) {
// Check consumer status.
if (!this.isRunning()) {
log.error("Unable to ack message because simple consumer is not running, state={}, clientId={}",
this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
return Futures.immediateFailedFuture(e);
}
if (!(messageView instanceof MessageViewImpl)) {
final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+ "messageView");
return Futures.immediateFailedFuture(exception);
}
MessageViewImpl impl = (MessageViewImpl) messageView;
final RpcFuture<AckMessageRequest, AckMessageResponse> future = ackMessage(impl);
return Futures.transformAsync(future, response -> {
final Status status = response.getStatus();
StatusChecker.check(status, future);
return Futures.immediateVoidFuture();
}, clientCallbackExecutor);
}
/**
* @see SimpleConsumer#changeInvisibleDuration(MessageView, Duration)
*/
@Override
public void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException {
final ListenableFuture<Void> future = changeInvisibleDuration0(messageView, invisibleDuration);
handleClientFuture(future);
}
/**
* @see SimpleConsumer#changeInvisibleDurationAsync(MessageView, Duration)
*/
@Override
public CompletableFuture<Void> changeInvisibleDurationAsync(MessageView messageView, Duration invisibleDuration) {
final ListenableFuture<Void> future = changeInvisibleDuration0(messageView, invisibleDuration);
return FutureConverter.toCompletableFuture(future);
}
public ListenableFuture<Void> changeInvisibleDuration0(MessageView messageView, Duration invisibleDuration) {
// Check consumer status.
if (!this.isRunning()) {
log.error("Unable to change invisible duration because simple consumer is not running, state={}, "
+ "clientId={}", this.state(), clientId);
final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
return Futures.immediateFailedFuture(e);
}
if (!(messageView instanceof MessageViewImpl)) {
final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+ "messageView");
return Futures.immediateFailedFuture(exception);
}
MessageViewImpl impl = (MessageViewImpl) messageView;
final RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse> future =
changeInvisibleDuration(impl, invisibleDuration);
return Futures.transformAsync(future, response -> {
// Refresh receipt handle manually.
impl.setReceiptHandle(response.getReceiptHandle());
final Status status = response.getStatus();
StatusChecker.check(status, future);
return Futures.immediateVoidFuture();
}, MoreExecutors.directExecutor());
}
/**
* @see SimpleConsumer#close()
*/
@Override
public void close() {
this.stopAsync().awaitTerminated();
}
@Override
public Settings getSettings() {
return simpleSubscriptionSettings;
}
private SubscriptionLoadBalancer updateSubscriptionLoadBalancer(String topic, TopicRouteData topicRouteData) {
SubscriptionLoadBalancer subscriptionLoadBalancer = subscriptionRouteDataCache.get(topic);
subscriptionLoadBalancer = null == subscriptionLoadBalancer ? new SubscriptionLoadBalancer(topicRouteData) :
subscriptionLoadBalancer.update(topicRouteData);
subscriptionRouteDataCache.put(topic, subscriptionLoadBalancer);
return subscriptionLoadBalancer;
}
@Override
public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
updateSubscriptionLoadBalancer(topic, topicRouteData);
}
private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(final String topic) {
final SubscriptionLoadBalancer loadBalancer = subscriptionRouteDataCache.get(topic);
if (null != loadBalancer) {
return Futures.immediateFuture(loadBalancer);
}
return Futures.transform(getRouteData(topic), topicRouteData -> updateSubscriptionLoadBalancer(topic,
topicRouteData), MoreExecutors.directExecutor());
}
}