| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.eventmesh.runtime.core.protocol.http.consumer; |
| |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import io.openmessaging.api.AsyncConsumeContext; |
| import io.openmessaging.api.AsyncMessageListener; |
| import io.openmessaging.api.Message; |
| import io.openmessaging.api.OnExceptionContext; |
| import io.openmessaging.api.SendCallback; |
| import io.openmessaging.api.SendResult; |
| |
| import org.apache.commons.collections4.MapUtils; |
| import org.apache.eventmesh.api.AbstractContext; |
| import org.apache.eventmesh.api.EventMeshAction; |
| import org.apache.eventmesh.api.EventMeshAsyncConsumeContext; |
| import org.apache.eventmesh.common.Constants; |
| import org.apache.eventmesh.common.protocol.SubscriptionItem; |
| import org.apache.eventmesh.common.protocol.SubscriptionMode; |
| import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; |
| import org.apache.eventmesh.runtime.constants.EventMeshConstants; |
| import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; |
| import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; |
| import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; |
| import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer; |
| import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext; |
| import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler; |
| import org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler; |
| import org.apache.eventmesh.runtime.util.EventMeshUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class EventMeshConsumer { |
| |
| private EventMeshHTTPServer eventMeshHTTPServer; |
| |
| private AtomicBoolean started4Persistent = new AtomicBoolean(Boolean.FALSE); |
| |
| private AtomicBoolean started4Broadcast = new AtomicBoolean(Boolean.FALSE); |
| |
| private AtomicBoolean inited4Persistent = new AtomicBoolean(Boolean.FALSE); |
| |
| private AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE); |
| |
| public Logger logger = LoggerFactory.getLogger(this.getClass()); |
| |
| public Logger messageLogger = LoggerFactory.getLogger("message"); |
| |
| private ConsumerGroupConf consumerGroupConf; |
| |
| private MQConsumerWrapper persistentMqConsumer; |
| |
| private MQConsumerWrapper broadcastMqConsumer; |
| |
| public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) { |
| this.eventMeshHTTPServer = eventMeshHTTPServer; |
| this.consumerGroupConf = consumerGroupConf; |
| this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType); |
| this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType); |
| } |
| |
| private MessageHandler httpMessageHandler; |
| |
| public synchronized void init() throws Exception { |
| httpMessageHandler = new HTTPMessageHandler(this); |
| Properties keyValue = new Properties(); |
| keyValue.put("isBroadcast", "false"); |
| keyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup()); |
| keyValue.put("eventMeshIDC", eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); |
| keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(), |
| eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); |
| persistentMqConsumer.init(keyValue); |
| |
| // |
| Properties broadcastKeyValue = new Properties(); |
| broadcastKeyValue.put("isBroadcast", "true"); |
| broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup()); |
| broadcastKeyValue.put("eventMeshIDC", eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC); |
| broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(), |
| eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster)); |
| broadcastMqConsumer.init(broadcastKeyValue); |
| inited4Persistent.compareAndSet(false, true); |
| inited4Broadcast.compareAndSet(false, true); |
| logger.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup()); |
| } |
| |
| public synchronized void start() throws Exception { |
| persistentMqConsumer.start(); |
| started4Persistent.compareAndSet(false, true); |
| broadcastMqConsumer.start(); |
| started4Broadcast.compareAndSet(false, true); |
| } |
| |
| public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception { |
| AsyncMessageListener listener = null; |
| if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) { |
| listener = new AsyncMessageListener() { |
| @Override |
| public void consume(Message message, AsyncConsumeContext context) { |
| String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); |
| String bizSeqNo = message.getSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); |
| String uniqueId = message.getUserProperties(Constants.RMB_UNIQ_ID); |
| |
| message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); |
| if (messageLogger.isDebugEnabled()) { |
| messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, message); |
| } else { |
| messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId); |
| } |
| |
| ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null); |
| EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context; |
| |
| if (currentTopicConfig == null) { |
| logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); |
| try { |
| sendMessageBack(message, uniqueId, bizSeqNo); |
| // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); |
| // context.ack(); |
| eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); |
| return; |
| } catch (Exception ex) { |
| } |
| } |
| HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, |
| topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); |
| |
| if (httpMessageHandler.handle(handleMsgContext)) { |
| // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); |
| // context.ack(); |
| eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); |
| } else { |
| try { |
| sendMessageBack(message, uniqueId, bizSeqNo); |
| } catch (Exception e) { |
| |
| } |
| // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); |
| // context.ack(); |
| eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); |
| } |
| } |
| }; |
| persistentMqConsumer.subscribe(topic, listener); |
| } else { |
| listener = new AsyncMessageListener() { |
| @Override |
| public void consume(Message message, AsyncConsumeContext context) { |
| String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION); |
| String bizSeqNo = message.getSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS); |
| String uniqueId = message.getUserProperties(Constants.RMB_UNIQ_ID); |
| |
| message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); |
| |
| if (messageLogger.isDebugEnabled()) { |
| messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, message); |
| } else { |
| messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId); |
| } |
| |
| ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null); |
| EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context; |
| |
| if (currentTopicConfig == null) { |
| logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic); |
| try { |
| sendMessageBack(message, uniqueId, bizSeqNo); |
| // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); |
| // context.ack(); |
| eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); |
| return; |
| } catch (Exception ex) { |
| } |
| } |
| HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this, |
| topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig); |
| |
| if (httpMessageHandler.handle(handleMsgContext)) { |
| // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name()); |
| // context.ack(); |
| eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck); |
| } else { |
| try { |
| sendMessageBack(message, uniqueId, bizSeqNo); |
| } catch (Exception e) { |
| |
| } |
| // context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); |
| // context.ack(); |
| eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage); |
| } |
| } |
| }; |
| broadcastMqConsumer.subscribe(topic, listener); |
| } |
| } |
| |
| public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws Exception { |
| if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) { |
| broadcastMqConsumer.unsubscribe(topic); |
| } else { |
| persistentMqConsumer.unsubscribe(topic); |
| } |
| } |
| |
| // public boolean isPause() { |
| // return persistentMqConsumer.isPause() && broadcastMqConsumer.isPause(); |
| // } |
| // |
| // public void pause() { |
| // persistentMqConsumer.pause(); |
| // broadcastMqConsumer.pause(); |
| // } |
| |
| public synchronized void shutdown() throws Exception { |
| persistentMqConsumer.shutdown(); |
| started4Persistent.compareAndSet(true, false); |
| broadcastMqConsumer.shutdown(); |
| started4Broadcast.compareAndSet(true, false); |
| } |
| |
| public void updateOffset(String topic, SubscriptionMode subscriptionMode, List<Message> msgs, AbstractContext context) { |
| if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) { |
| broadcastMqConsumer.updateOffset(msgs, context); |
| } else { |
| persistentMqConsumer.updateOffset(msgs, context); |
| } |
| } |
| |
| public ConsumerGroupConf getConsumerGroupConf() { |
| return consumerGroupConf; |
| } |
| |
| public EventMeshHTTPServer getEventMeshHTTPServer() { |
| return eventMeshHTTPServer; |
| } |
| |
| public void sendMessageBack(final Message msgBack, final String uniqueId, String bizSeqNo) throws Exception { |
| |
| EventMeshProducer sendMessageBack |
| = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup()); |
| |
| if (sendMessageBack == null) { |
| logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); |
| return; |
| } |
| |
| final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, msgBack, sendMessageBack, eventMeshHTTPServer); |
| |
| sendMessageBack.send(sendMessageBackContext, new SendCallback() { |
| @Override |
| public void onSuccess(SendResult sendResult) { |
| } |
| |
| @Override |
| public void onException(OnExceptionContext context) { |
| logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); |
| } |
| |
| // @Override |
| // public void onException(Throwable e) { |
| // logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId); |
| // } |
| }); |
| } |
| } |