blob: ba913fa57953c9b3a5c355dd388bc99d4520bb60 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.grpc.consumer;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.CONSUMER_GROUP;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.EVENT_MESH_IDC;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.INSTANCE_NAME;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.IS_BROADCAST;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.grpc.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.HandleMsgContext;
import org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.commons.collections4.MapUtils;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EventMeshConsumer {
private final transient String consumerGroup;
private final transient EventMeshGrpcServer eventMeshGrpcServer;
private final transient EventMeshGrpcConfiguration eventMeshGrpcConfiguration;
private final transient MQConsumerWrapper persistentMqConsumer;
private final transient MQConsumerWrapper broadcastMqConsumer;
private final transient MessageHandler messageHandler;
private transient ServiceState serviceState;
/**
* Key: topic Value: ConsumerGroupTopicConfig
**/
private final transient Map<String, ConsumerGroupTopicConfig> consumerGroupTopicConfig = new ConcurrentHashMap<>();
public EventMeshConsumer(final EventMeshGrpcServer eventMeshGrpcServer, final String consumerGroup) {
this.eventMeshGrpcServer = eventMeshGrpcServer;
this.eventMeshGrpcConfiguration = eventMeshGrpcServer.getEventMeshGrpcConfiguration();
this.consumerGroup = consumerGroup;
this.messageHandler = new MessageHandler(consumerGroup, eventMeshGrpcServer.getPushMsgExecutor());
this.persistentMqConsumer = new MQConsumerWrapper(eventMeshGrpcConfiguration.getEventMeshStoragePluginType());
this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshGrpcConfiguration.getEventMeshStoragePluginType());
}
/**
* Register client's topic information and return true if this EventMeshConsumer required restart because of the topic changes
*
* @param client ConsumerGroupClient
* @return true if the underlining EventMeshConsumer needs to restart later; false otherwise
*/
public synchronized boolean registerClient(final ConsumerGroupClient client) {
boolean requireRestart = false;
ConsumerGroupTopicConfig topicConfig = consumerGroupTopicConfig.get(client.getTopic());
if (topicConfig == null) {
topicConfig = ConsumerGroupTopicConfig.buildTopicConfig(consumerGroup, client.getTopic(),
client.getSubscriptionMode(), client.getGrpcType());
consumerGroupTopicConfig.put(client.getTopic(), topicConfig);
requireRestart = true;
}
topicConfig.registerClient(client);
return requireRestart;
}
/**
* Deregister client's topic information and return true if this EventMeshConsumer required restart because of the topic changes
*
* @param client ConsumerGroupClient
* @return true if the underlining EventMeshConsumer needs to restart later; false otherwise
*/
public synchronized boolean deregisterClient(final ConsumerGroupClient client) {
boolean requireRestart = false;
final ConsumerGroupTopicConfig topicConfig = consumerGroupTopicConfig.get(client.getTopic());
if (topicConfig != null) {
topicConfig.deregisterClient(client);
if (topicConfig.getSize() == 0) {
consumerGroupTopicConfig.remove(client.getTopic());
requireRestart = true;
}
}
return requireRestart;
}
public synchronized void init() throws Exception {
if (MapUtils.isEmpty(consumerGroupTopicConfig)) {
// no topics, don't init the consumer
return;
}
final Properties keyValue = new Properties();
keyValue.put(IS_BROADCAST, "false");
keyValue.put(CONSUMER_GROUP, consumerGroup);
keyValue.put(EVENT_MESH_IDC, eventMeshGrpcConfiguration.getEventMeshIDC());
keyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.getEventMeshCluster()));
persistentMqConsumer.init(keyValue);
persistentMqConsumer.registerEventListener(createEventListener(SubscriptionMode.CLUSTERING));
final Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put(IS_BROADCAST, "true");
broadcastKeyValue.put(CONSUMER_GROUP, consumerGroup);
broadcastKeyValue.put(EVENT_MESH_IDC, eventMeshGrpcConfiguration.getEventMeshIDC());
broadcastKeyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroup,
eventMeshGrpcConfiguration.getEventMeshCluster()));
broadcastMqConsumer.init(broadcastKeyValue);
broadcastMqConsumer.registerEventListener(createEventListener(SubscriptionMode.BROADCASTING));
serviceState = ServiceState.INITED;
if (log.isInfoEnabled()) {
log.info("EventMeshConsumer [{}] initialized.............", consumerGroup);
}
}
public synchronized void start() throws Exception {
if (MapUtils.isEmpty(consumerGroupTopicConfig)) {
// no topics, don't start the consumer
return;
}
consumerGroupTopicConfig.forEach((k, v) -> {
try {
subscribe(k, v.getSubscriptionMode());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
persistentMqConsumer.start();
broadcastMqConsumer.start();
serviceState = ServiceState.RUNNING;
if (log.isInfoEnabled()) {
log.info("EventMeshConsumer [{}] started..........", consumerGroup);
}
}
public synchronized void shutdown() throws Exception {
persistentMqConsumer.shutdown();
broadcastMqConsumer.shutdown();
serviceState = ServiceState.STOPPED;
if (log.isInfoEnabled()) {
log.info("EventMeshConsumer [{}] shutdown.........", consumerGroup);
}
}
public ServiceState getStatus() {
return serviceState;
}
public void subscribe(final String topic, final SubscriptionMode subscriptionMode) throws Exception {
if (SubscriptionMode.CLUSTERING == subscriptionMode) {
persistentMqConsumer.subscribe(topic);
} else if (SubscriptionMode.BROADCASTING == subscriptionMode) {
broadcastMqConsumer.subscribe(topic);
} else {
//log.error("Subscribe Failed. Incorrect Subscription Mode");
throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
}
}
public void unsubscribe(final SubscriptionItem subscriptionItem) throws Exception {
final SubscriptionMode mode = subscriptionItem.getMode();
final String topic = subscriptionItem.getTopic();
if (SubscriptionMode.CLUSTERING == mode) {
persistentMqConsumer.unsubscribe(topic);
} else if (SubscriptionMode.BROADCASTING == mode) {
broadcastMqConsumer.unsubscribe(topic);
} else {
throw new Exception("Unsubscribe Failed. Incorrect Subscription Mode");
}
}
public void updateOffset(final SubscriptionMode subscriptionMode, final List<CloudEvent> events, final AbstractContext context)
throws Exception {
if (SubscriptionMode.CLUSTERING == subscriptionMode) {
persistentMqConsumer.updateOffset(events, context);
} else if (SubscriptionMode.BROADCASTING == subscriptionMode) {
broadcastMqConsumer.updateOffset(events, context);
} else {
throw new Exception("Subscribe Failed. Incorrect Subscription Mode");
}
}
private EventListener createEventListener(final SubscriptionMode subscriptionMode) {
return (event, context) -> {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
final String topic = event.getSubject();
final String bizSeqNo = Optional.ofNullable(
(String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS))
.orElseGet(() -> "");
final String uniqueId = Optional.ofNullable((String) event.getExtension(Constants.RMB_UNIQ_ID))
.orElseGet(() -> "");
if (log.isDebugEnabled()) {
log.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
} else {
if (log.isInfoEnabled()) {
log.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic,
bizSeqNo, uniqueId);
}
eventMeshGrpcServer.getMetricsMonitor().recordReceiveMsgFromQueue();
}
final EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
final ConsumerGroupTopicConfig topicConfig = consumerGroupTopicConfig.get(topic);
if (topicConfig != null) {
final HandleMsgContext handleMsgContext =
new HandleMsgContext(consumerGroup, event, subscriptionMode, topicConfig.getGrpcType(),
eventMeshAsyncConsumeContext.getAbstractContext(), eventMeshGrpcServer,
this,
topicConfig);
if (messageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
return;
} else {
// can not handle the message due to the capacity limit is reached
// wait for some time and send this message back to mq and consume again
try {
ThreadUtils.sleep(5, TimeUnit.SECONDS);
sendMessageBack(consumerGroup, event, uniqueId, bizSeqNo);
} catch (Exception ignored) {
// ignore exception
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("no active consumer for topic={}|msg={}", topic, event);
}
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
};
}
public void sendMessageBack(final String consumerGroup, final CloudEvent event,
final String uniqueId, final String bizSeqNo) throws Exception {
final EventMeshProducer producer
= eventMeshGrpcServer.getProducerManager().getEventMeshProducer(consumerGroup);
if (producer == null) {
if (log.isWarnEnabled()) {
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}",
consumerGroup, bizSeqNo, uniqueId);
}
return;
}
final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event,
producer, eventMeshGrpcServer);
producer.send(sendMessageBackContext, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
}
@Override
public void onException(final OnExceptionContext context) {
if (log.isWarnEnabled()) {
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", consumerGroup,
bizSeqNo, uniqueId);
}
}
});
}
}