blob: ac45ca6cb3bfdd70b08dc2b5ce6fb1677d4d5b56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
import org.apache.eventmesh.api.AsyncConsumeContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.EventMeshTcpRetryer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.HttpTinyClient;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ClientGroupWrapper {
private final String sysId;
private String group;
private EventMeshTCPConfiguration eventMeshTCPConfiguration;
private final EventMeshTCPServer eventMeshTCPServer;
private EventMeshTcpRetryer eventMeshTcpRetryer;
private EventMeshTcpMonitor eventMeshTcpMonitor;
private DownstreamDispatchStrategy downstreamDispatchStrategy;
private final ReadWriteLock groupLock = new ReentrantReadWriteLock();
public Set<Session> groupConsumerSessions = new HashSet<Session>();
public Set<Session> groupProducerSessions = new HashSet<Session>();
public AtomicBoolean started4Persistent = new AtomicBoolean(Boolean.FALSE);
public AtomicBoolean started4Broadcast = new AtomicBoolean(Boolean.FALSE);
public AtomicBoolean inited4Persistent = new AtomicBoolean(Boolean.FALSE);
public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE);
private MQConsumerWrapper persistentMsgConsumer;
private MQConsumerWrapper broadCastMsgConsumer;
private final ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
new ConcurrentHashMap<String, Set<Session>>();
private final ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
private final MQProducerWrapper mqProducerWrapper;
public ClientGroupWrapper(String sysId, String group,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
this.sysId = sysId;
this.group = group;
this.eventMeshTCPServer = eventMeshTCPServer;
this.eventMeshTCPConfiguration = eventMeshTCPServer.getEventMeshTCPConfiguration();
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
this.eventMeshTcpMonitor =
Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
}
public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
return topic2sessionInGroupMapping;
}
public boolean hasSubscription(String topic) {
boolean has = false;
try {
this.groupLock.readLock().lockInterruptibly();
has = topic2sessionInGroupMapping.containsKey(topic);
} catch (Exception e) {
log.error("hasSubscription error! topic[{}]", topic);
} finally {
this.groupLock.readLock().unlock();
}
return has;
}
public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback)
throws Exception {
mqProducerWrapper.send(upStreamMsgContext.getEvent(), sendCallback);
return true;
}
public void request(UpStreamMsgContext upStreamMsgContext, RequestReplyCallback rrCallback,
long timeout)
throws Exception {
mqProducerWrapper.request(upStreamMsgContext.getEvent(), rrCallback, timeout);
}
public boolean reply(UpStreamMsgContext upStreamMsgContext) throws Exception {
mqProducerWrapper.reply(upStreamMsgContext.getEvent(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(OnExceptionContext context) {
String bizSeqNo = (String) upStreamMsgContext.getEvent()
.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
log.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
upStreamMsgContext.getSession().getClient(), context.getException());
}
});
return true;
}
public MQProducerWrapper getMqProducerWrapper() {
return mqProducerWrapper;
}
public boolean addSubscription(SubscriptionItem subscriptionItem, Session session)
throws Exception {
if (subscriptionItem == null) {
log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
return false;
}
String topic = subscriptionItem.getTopic();
if (session == null || !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
log.error("addSubscription param error,topic:{},session:{}", topic, session);
return false;
}
boolean r;
try {
this.groupLock.writeLock().lockInterruptibly();
if (!topic2sessionInGroupMapping.containsKey(topic)) {
Set<Session> sessions = new HashSet<Session>();
topic2sessionInGroupMapping.put(topic, sessions);
}
r = topic2sessionInGroupMapping.get(topic).add(session);
if (r) {
if (log.isInfoEnabled()) {
log.info("addSubscription success, group:{} topic:{} client:{}", group,
topic, session.getClient());
}
} else {
if (log.isWarnEnabled()) {
log.warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
session.getClient());
}
}
subscriptions.putIfAbsent(topic, subscriptionItem);
} catch (Exception e) {
log.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
throw new Exception("addSubscription fail");
} finally {
this.groupLock.writeLock().unlock();
}
return r;
}
public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) {
if (subscriptionItem == null) {
log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
return false;
}
String topic = subscriptionItem.getTopic();
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
log.error("removeSubscription param error,topic:{},session:{}", topic, session);
return false;
}
boolean r = false;
try {
this.groupLock.writeLock().lockInterruptibly();
if (topic2sessionInGroupMapping.containsKey(topic)) {
r = topic2sessionInGroupMapping.get(topic).remove(session);
if (r) {
if (log.isInfoEnabled()) {
log.info(
"removeSubscription remove session success, group:{} topic:{} client:{}",
group, topic, session.getClient());
}
} else {
if (log.isWarnEnabled()) {
log.warn(
"removeSubscription remove session failed, group:{} topic:{} client:{}",
group, topic, session.getClient());
}
}
}
if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
topic2sessionInGroupMapping.remove(topic);
subscriptions.remove(topic);
log.info("removeSubscription remove topic success, group:{} topic:{}",
group, topic);
}
} catch (Exception e) {
log.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
e);
} finally {
this.groupLock.writeLock().unlock();
}
return r;
}
public synchronized void startClientGroupProducer() throws Exception {
if (producerStarted.get()) {
return;
}
Properties keyValue = new Properties();
keyValue.put(EventMeshConstants.PRODUCER_GROUP, group);
keyValue.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil
.buildMeshTcpClientID(sysId, EventMeshConstants.PURPOSE_PUB_UPPER_CASE,
eventMeshTCPConfiguration.getEventMeshCluster()));
//TODO for defibus
keyValue.put(EventMeshConstants.EVENT_MESH_IDC, eventMeshTCPConfiguration.getEventMeshIDC());
mqProducerWrapper.init(keyValue);
mqProducerWrapper.start();
producerStarted.compareAndSet(false, true);
log.info("starting producer success, group:{}", group);
}
public synchronized void shutdownProducer() throws Exception {
if (!producerStarted.get()) {
return;
}
mqProducerWrapper.shutdown();
producerStarted.compareAndSet(true, false);
log.info("shutdown producer success for group:{}", group);
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public boolean addGroupConsumerSession(Session session) {
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
log.error("addGroupConsumerSession param error,session:{}", session);
return false;
}
boolean r = false;
try {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.add(session);
if (r) {
if (log.isInfoEnabled()) {
log.info("addGroupConsumerSession success, group:{} client:{}", group,
session.getClient());
}
}
} catch (Exception e) {
log.error("addGroupConsumerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
return r;
}
public boolean addGroupProducerSession(Session session) {
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
log.error("addGroupProducerSession param error,session:{}", session);
return false;
}
boolean r = false;
try {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.add(session);
if (r) {
log.info("addGroupProducerSession success, group:{} client:{}", group,
session.getClient());
}
} catch (Exception e) {
log.error("addGroupProducerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
return r;
}
public boolean removeGroupConsumerSession(Session session) {
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
log.error("removeGroupConsumerSession param error,session:{}", session);
return false;
}
boolean r = false;
try {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.remove(session);
if (r) {
if (log.isInfoEnabled()) {
log.info("removeGroupConsumerSession success, group:{} client:{}", group,
session.getClient());
}
}
} catch (Exception e) {
log.error("removeGroupConsumerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
return r;
}
public boolean removeGroupProducerSession(Session session) {
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
log.error("removeGroupProducerSession param error,session:{}", session);
return false;
}
boolean r = false;
try {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.remove(session);
if (r) {
log.info("removeGroupProducerSession success, group:{} client:{}", group,
session.getClient());
}
} catch (Exception e) {
log.error("removeGroupProducerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
return r;
}
public synchronized void initClientGroupPersistentConsumer() throws Exception {
if (inited4Persistent.get()) {
return;
}
Properties keyValue = new Properties();
keyValue.put(EventMeshConstants.IS_BROADCAST, "false");
keyValue.put(EventMeshConstants.CONSUMER_GROUP, group);
keyValue.put(EventMeshConstants.EVENT_MESH_IDC, eventMeshTCPConfiguration.getEventMeshIDC());
keyValue.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil
.buildMeshTcpClientID(sysId, EventMeshConstants.PURPOSE_SUB_UPPER_CASE,
eventMeshTCPConfiguration.getEventMeshCluster()));
persistentMsgConsumer.init(keyValue);
EventListener listener = (CloudEvent event, AsyncConsumeContext context) -> {
String protocolVersion =
Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
.incrementAndGet();
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshTCPConfiguration.getEventMeshServerIp()).build();
String topic = event.getSubject();
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
Session session = downstreamDispatchStrategy
.select(group, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(event);
if (session == null) {
try {
Integer sendBackTimes = 0;
String sendBackFromEventMeshIp = "";
if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
sendBackTimes = (Integer) event.getExtension(
EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
}
if (StringUtils.isNotBlank(Objects.requireNonNull(event.getExtension(
EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString())) {
sendBackFromEventMeshIp = (String) event.getExtension(
EventMeshConstants.EVENTMESH_SEND_BACK_IP);
}
log.error(
"found no session to downstream msg,groupName:{}, topic:{}, "
+ "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
group, topic, bizSeqNo, sendBackTimes,
sendBackFromEventMeshIp);
int eventMeshTcpSendBackMaxTimes = eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSendBackMaxTimes();
if (Objects.requireNonNull(sendBackTimes) >= eventMeshTcpSendBackMaxTimes) {
log.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, " + "bizSeqNo:{}", eventMeshTcpSendBackMaxTimes,
group, topic, bizSeqNo);
} else {
sendBackTimes++;
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
sendBackTimes.toString())
.withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
eventMeshTCPConfiguration.getEventMeshServerIp()).build();
sendMsgBackToBroker(event, bizSeqNo);
}
} catch (Exception e) {
log.warn("handle msg exception when no session found", e);
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
SubscriptionItem subscriptionItem = subscriptions.get(topic);
DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(event, session, persistentMsgConsumer,
eventMeshAsyncConsumeContext.getAbstractContext(), false,
subscriptionItem);
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} finally {
TraceUtils.finishSpan(span, event);
}
};
persistentMsgConsumer.registerEventListener(listener);
inited4Persistent.compareAndSet(false, true);
if (log.isInfoEnabled()) {
log.info("init persistentMsgConsumer success, group:{}", group);
}
}
public synchronized void startClientGroupPersistentConsumer() throws Exception {
if (started4Persistent.get()) {
return;
}
persistentMsgConsumer.start();
started4Persistent.compareAndSet(false, true);
if (log.isInfoEnabled()) {
log.info("starting persistentMsgConsumer success, group:{}", group);
}
}
public synchronized void initClientGroupBroadcastConsumer() throws Exception {
if (inited4Broadcast.get()) {
return;
}
Properties keyValue = new Properties();
keyValue.put(EventMeshConstants.IS_BROADCAST, "true");
keyValue.put(EventMeshConstants.CONSUMER_GROUP, group);
keyValue.put(EventMeshConstants.EVENT_MESH_IDC, eventMeshTCPConfiguration.getEventMeshIDC());
keyValue.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil
.buildMeshTcpClientID(sysId, EventMeshConstants.PURPOSE_SUB_UPPER_CASE,
eventMeshTCPConfiguration.getEventMeshCluster()));
broadCastMsgConsumer.init(keyValue);
EventListener listener = (event, context) -> {
String protocolVersion =
Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum()
.incrementAndGet();
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
eventMeshTCPConfiguration.getEventMeshServerIp()).build();
String topic = event.getSubject();
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
if (log.isWarnEnabled()) {
log.warn("found no session to downstream broadcast msg");
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
SubscriptionItem subscriptionItem = subscriptions.get(topic);
DownStreamMsgContext downStreamMsgContext =
new DownStreamMsgContext(event, null, broadCastMsgConsumer,
eventMeshAsyncConsumeContext.getAbstractContext(), false,
subscriptionItem);
while (sessionsItr.hasNext()) {
Session session = sessionsItr.next();
if (!session.isAvailable(topic)) {
if (log.isWarnEnabled()) {
log.warn("downstream broadcast msg,session is not available,client:{}",
session.getClient());
}
continue;
}
downStreamMsgContext.setSession(session);
//downstream broadcast msg asynchronously
eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
.submit(() -> {
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
});
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} finally {
TraceUtils.finishSpan(span, event);
}
};
broadCastMsgConsumer.registerEventListener(listener);
inited4Broadcast.compareAndSet(false, true);
if (log.isInfoEnabled()) {
log.info("init broadCastMsgConsumer success, group:{}", group);
}
}
public synchronized void startClientGroupBroadcastConsumer() throws Exception {
if (started4Broadcast.get()) {
return;
}
broadCastMsgConsumer.start();
started4Broadcast.compareAndSet(false, true);
log.info("starting broadCastMsgConsumer success, group:{}", group);
}
public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
if (SubscriptionMode.BROADCASTING == subscriptionItem.getMode()) {
broadCastMsgConsumer.subscribe(subscriptionItem.getTopic());
} else {
persistentMsgConsumer.subscribe(subscriptionItem.getTopic());
}
}
public void unsubscribe(SubscriptionItem subscriptionItem) throws Exception {
if (SubscriptionMode.BROADCASTING == subscriptionItem.getMode()) {
broadCastMsgConsumer.unsubscribe(subscriptionItem.getTopic());
} else {
persistentMsgConsumer.unsubscribe(subscriptionItem.getTopic());
}
}
public synchronized void shutdownBroadCastConsumer() throws Exception {
if (started4Broadcast.get()) {
broadCastMsgConsumer.shutdown();
if (log.isInfoEnabled()) {
log.info("broadcast consumer group:{} shutdown...", group);
}
}
started4Broadcast.compareAndSet(true, false);
inited4Broadcast.compareAndSet(true, false);
broadCastMsgConsumer = null;
}
public synchronized void shutdownPersistentConsumer() throws Exception {
if (started4Persistent.get()) {
persistentMsgConsumer.shutdown();
if (log.isInfoEnabled()) {
log.info("persistent consumer group:{} shutdown...", group);
}
}
started4Persistent.compareAndSet(true, false);
inited4Persistent.compareAndSet(true, false);
persistentMsgConsumer = null;
}
public Set<Session> getGroupConsumerSessions() {
return groupConsumerSessions;
}
public Set<Session> getGroupProducerSessions() {
return groupProducerSessions;
}
public EventMeshTCPConfiguration getEventMeshTCPConfiguration() {
return eventMeshTCPConfiguration;
}
public void setEventMeshTCPConfiguration(EventMeshTCPConfiguration eventMeshTCPConfiguration) {
this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
}
public EventMeshTcpRetryer getEventMeshTcpRetryer() {
return eventMeshTcpRetryer;
}
public void setEventMeshTcpRetryer(EventMeshTcpRetryer eventMeshTcpRetryer) {
this.eventMeshTcpRetryer = eventMeshTcpRetryer;
}
public EventMeshTcpMonitor getEventMeshTcpMonitor() {
return eventMeshTcpMonitor;
}
public void setEventMeshTcpMonitor(EventMeshTcpMonitor eventMeshTcpMonitor) {
this.eventMeshTcpMonitor = eventMeshTcpMonitor;
}
public DownstreamDispatchStrategy getDownstreamDispatchStrategy() {
return downstreamDispatchStrategy;
}
public void setDownstreamDispatchStrategy(
DownstreamDispatchStrategy downstreamDispatchStrategy) {
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
}
public String getSysId() {
return sysId;
}
private String pushMsgToEventMesh(CloudEvent msg, String ip, int port) throws Exception {
StringBuilder targetUrl = new StringBuilder();
targetUrl.append("http://").append(ip).append(":").append(port)
.append("/eventMesh/msg/push");
HttpTinyClient.HttpResult result = null;
try {
if (log.isInfoEnabled()) {
log.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl, msg);
}
List<String> paramValues = new ArrayList<String>();
paramValues.add(EventMeshConstants.MANAGE_MSG);
paramValues.add(JsonUtils.toJSONString(msg));
paramValues.add(EventMeshConstants.MANAGE_GROUP);
paramValues.add(group);
result = HttpTinyClient.httpPost(
targetUrl.toString(),
null,
paramValues,
StandardCharsets.UTF_8.name(),
3000);
} catch (Exception e) {
log.error("httpPost " + targetUrl + " is fail,", e);
throw e;
}
if (200 == result.getCode() && result.getContent() != null) {
return result.getContent();
} else {
throw new Exception("httpPost targetUrl[" + targetUrl
+ "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
}
}
public MQConsumerWrapper getPersistentMsgConsumer() {
return persistentMsgConsumer;
}
private void sendMsgBackToBroker(CloudEvent event, String bizSeqNo) throws Exception {
try {
String topic = event.getSubject();
if (log.isWarnEnabled()) {
log.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
}
long startTime = System.currentTimeMillis();
long taskExcuteTime = startTime;
send(new UpStreamMsgContext(null, event, null, startTime, taskExcuteTime),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (log.isInfoEnabled()) {
log.info(
"group:{} consume fail, sendMessageBack success, bizSeqno:{}, "
+ "topic:{}", group, bizSeqNo, topic);
}
}
@Override
public void onException(OnExceptionContext context) {
if (log.isWarnEnabled()) {
log.warn(
"group:{} consume fail, sendMessageBack fail, bizSeqno:{},"
+ " topic:{}", group, bizSeqNo, topic);
}
}
});
eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("try send msg back to broker failed");
}
throw e;
}
}
}