blob: 672727570b5f7a8d4fa43be7858b6f1ccf250487 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DistributionAdvisor.InitializationListener;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.ProfileListener;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class SenderIdMonitor implements ProfileListener, InitializationListener {
private static final Logger logger = LogService.getLogger();
private final InternalRegion region;
private final CacheDistributionAdvisor advisor;
private volatile Set<String> illegalGatewaySenderIds = null;
private volatile Set<String> illegalAsyncEventQueueIds = null;
private boolean gatewaySenderIdsDifferWarningMessage;
private boolean asyncQueueIdsDifferWarningMessage;
private SenderIdMonitor(InternalRegion region, CacheDistributionAdvisor advisor) {
this.region = region;
this.advisor = advisor;
}
public static SenderIdMonitor createSenderIdMonitor(InternalRegion region,
CacheDistributionAdvisor advisor) {
SenderIdMonitor senderIdMonitor = new SenderIdMonitor(region, advisor);
advisor.addProfileChangeListener(senderIdMonitor);
advisor.setInitializationListener(senderIdMonitor);
return senderIdMonitor;
}
@Override
public void profileCreated(Profile profile) {
update();
}
@Override
public void profileUpdated(Profile profile) {
update();
}
@Override
public void profileRemoved(Profile profile, boolean destroyed) {
update();
}
@Override
public void initialized() {
update();
}
/**
* Needs to be called if this region's gateways or asyncEventIds change.
*/
public void update() {
if (!this.advisor.pollIsInitialized()) {
return;
}
final Set<String> gatewaySenderIds = region.getGatewaySenderIds();
final Set<String> visibleAsyncEventQueueIds = region.getVisibleAsyncEventQueueIds();
final AtomicBoolean foundIllegalState = new AtomicBoolean();
this.advisor.accept((advisor, profile, idx, count, aggregate) -> {
if (profile instanceof CacheProfile) {
final CacheProfile cp = (CacheProfile) profile;
if (!gatewaySenderIds.equals(cp.gatewaySenderIds)) {
foundIllegalState.set(true);
illegalGatewaySenderIds = cp.gatewaySenderIds;
}
if (!visibleAsyncEventQueueIds.equals(cp.asyncEventQueueIds)) {
foundIllegalState.set(true);
illegalAsyncEventQueueIds = cp.asyncEventQueueIds;
}
}
return true;
}, null);
if (!foundIllegalState.get()) {
illegalGatewaySenderIds = null;
illegalAsyncEventQueueIds = null;
}
}
@VisibleForTesting
public boolean getGatewaySenderIdsDifferWarningMessage() {
return gatewaySenderIdsDifferWarningMessage;
}
@VisibleForTesting
public boolean getAsyncQueueIdsDifferWarningMessage() {
return asyncQueueIdsDifferWarningMessage;
}
public void checkSenderIds() {
if (illegalGatewaySenderIds != null) {
if (!gatewaySenderIdsDifferWarningMessage) {
gatewaySenderIdsDifferWarningMessage = true;
logger.warn(
"Region {} has {} gateway sender IDs. Another member has the same region with {} gateway sender IDs. For the same region, across all members, gateway sender ids should be same.",
region.getName(), region.getGatewaySenderIds(), illegalGatewaySenderIds);
}
} else {
if (gatewaySenderIdsDifferWarningMessage) {
gatewaySenderIdsDifferWarningMessage = false;
logger.warn(
"Region {} now has the same gateway sender IDs on all members. The previous problem with them being different has been corrected.",
region.getName());
}
}
if (illegalAsyncEventQueueIds != null) {
if (!asyncQueueIdsDifferWarningMessage) {
asyncQueueIdsDifferWarningMessage = true;
logger.warn(
"Region {} has {} AsyncEvent queue IDs. Another member has the same region with {} AsyncEvent queue IDs. For the same region, across all members, AsyncEvent queue IDs should be same.",
region.getName(), region.getVisibleAsyncEventQueueIds(), illegalAsyncEventQueueIds);
}
} else {
if (asyncQueueIdsDifferWarningMessage) {
asyncQueueIdsDifferWarningMessage = false;
logger.warn(
"Region {} now has the same AsyncEvent queue IDs on all members. The previous problem with them being different has been corrected.",
region.getName());
}
}
}
}