blob: 31fd31b5988bd7708c9d54794dbd470dbb654692 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.http.consumer;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.CONSUMER_GROUP;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.EVENT_MESH_IDC;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.INSTANCE_NAME;
import static org.apache.eventmesh.runtime.constants.EventMeshConstants.IS_BROADCAST;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPMessageHandler;
import org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.commons.collections4.MapUtils;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EventMeshConsumer {
private final EventMeshHTTPServer eventMeshHTTPServer;
private final AtomicBoolean started4Persistent = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean started4Broadcast = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean inited4Persistent = new AtomicBoolean(Boolean.FALSE);
private final AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE);
public final Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
private ConsumerGroupConf consumerGroupConf;
private final MQConsumerWrapper persistentMqConsumer;
private final MQConsumerWrapper broadcastMqConsumer;
public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.consumerGroupConf = consumerGroupConf;
this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType());
this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshStoragePluginType());
}
private MessageHandler httpMessageHandler;
public synchronized void init() throws Exception {
httpMessageHandler = new HTTPMessageHandler(this);
Properties keyValue = new Properties();
keyValue.put(IS_BROADCAST, "false");
keyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
keyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
keyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
persistentMqConsumer.init(keyValue);
EventListener clusterEventListener = (event, context) -> {
String protocolVersion =
Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
String topic = event.getSubject();
String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO)).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID)).toString();
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
.build();
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
} else {
messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}", topic, bizSeqNo, uniqueId);
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
log.error("no topicConfig found, consumerGroup:{} topic:{}",
consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
//ignore
}
}
SubscriptionItem subscriptionItem =
consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
HandleMsgContext handleMsgContext = new HandleMsgContext(
EventMeshUtil.buildPushMsgSeqNo(),
consumerGroupConf.getConsumerGroup(),
EventMeshConsumer.this,
topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
//ignore
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
} finally {
TraceUtils.finishSpan(span, event);
}
};
persistentMqConsumer.registerEventListener(clusterEventListener);
//broadcast consumer
Properties broadcastKeyValue = new Properties();
broadcastKeyValue.put(IS_BROADCAST, "true");
broadcastKeyValue.put(CONSUMER_GROUP, consumerGroupConf.getConsumerGroup());
broadcastKeyValue.put(EVENT_MESH_IDC, eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
broadcastKeyValue.put(INSTANCE_NAME, EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()));
broadcastMqConsumer.init(broadcastKeyValue);
EventListener broadcastEventListener = (event, context) -> {
String protocolVersion =
Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
.build();
String topic = event.getSubject();
String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO);
String uniqueId = getEventExtension(event, ProtocolKey.ClientInstanceKey.UNIQUEID);
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
} else {
messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}",
topic, bizSeqNo,
uniqueId);
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
log.error("no topicConfig found, consumerGroup:{} topic:{}",
consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
//ignore
}
}
SubscriptionItem subscriptionItem =
consumerGroupConf.getConsumerGroupTopicConf().get(topic)
.getSubscriptionItem();
HandleMsgContext handleMsgContext =
new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, event, subscriptionItem,
eventMeshAsyncConsumeContext.getAbstractContext(),
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(event, uniqueId, bizSeqNo);
} catch (Exception e) {
//ignore
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
} finally {
TraceUtils.finishSpan(span, event);
}
};
broadcastMqConsumer.registerEventListener(broadcastEventListener);
inited4Persistent.compareAndSet(false, true);
inited4Broadcast.compareAndSet(false, true);
log.info("EventMeshConsumer [{}] inited.............", consumerGroupConf.getConsumerGroup());
}
private String getEventExtension(CloudEvent event, String protocolKey) {
Object extension = event.getExtension(protocolKey);
return Objects.isNull(extension) ? "" : extension.toString();
}
public synchronized void start() throws Exception {
persistentMqConsumer.start();
started4Persistent.compareAndSet(false, true);
broadcastMqConsumer.start();
started4Broadcast.compareAndSet(false, true);
}
public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception {
if (SubscriptionMode.BROADCASTING != subscriptionItem.getMode()) {
persistentMqConsumer.subscribe(topic);
} else {
broadcastMqConsumer.subscribe(topic);
}
}
public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
if (SubscriptionMode.BROADCASTING == subscriptionMode) {
broadcastMqConsumer.unsubscribe(topic);
} else {
persistentMqConsumer.unsubscribe(topic);
}
}
public synchronized void shutdown() throws Exception {
persistentMqConsumer.shutdown();
started4Persistent.compareAndSet(true, false);
broadcastMqConsumer.shutdown();
started4Broadcast.compareAndSet(true, false);
}
public void updateOffset(String topic, SubscriptionMode subscriptionMode, List<CloudEvent> events,
AbstractContext context) {
if (SubscriptionMode.BROADCASTING == subscriptionMode) {
broadcastMqConsumer.updateOffset(events, context);
} else {
persistentMqConsumer.updateOffset(events, context);
}
}
public ConsumerGroupConf getConsumerGroupConf() {
return consumerGroupConf;
}
public void setConsumerGroupConf(ConsumerGroupConf consumerGroupConf) {
this.consumerGroupConf = consumerGroupConf;
}
public EventMeshHTTPServer getEventMeshHTTPServer() {
return eventMeshHTTPServer;
}
public void sendMessageBack(final CloudEvent event, final String uniqueId, String bizSeqNo) throws Exception {
EventMeshProducer sendMessageBack
= eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
if (sendMessageBack == null) {
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}",
consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
return;
}
final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event, sendMessageBack,
eventMeshHTTPServer);
sendMessageBack.send(sendMessageBackContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(OnExceptionContext context) {
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
}
});
}
}