blob: 2098a8e24fea98152560e14e30d149917699a384 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
import static org.apache.geode.logging.internal.spi.LoggingProvider.SECURITY_LOGGER_NAME;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestRegistrationEvent;
import org.apache.geode.cache.InterestRegistrationListener;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.ClassLoadUtils;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.CacheClientStatus;
import org.apache.geode.internal.cache.CacheDistributionAdvisor;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.ClientRegionEventImpl;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.internal.cache.ha.HAContainerMap;
import org.apache.geode.internal.cache.ha.HAContainerRegion;
import org.apache.geode.internal.cache.ha.HAContainerWrapper;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.OverflowAttributes;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.AccessControl;
import org.apache.geode.security.AuthenticationExpiredException;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
import org.apache.geode.util.internal.GeodeGlossary;
/**
* Class {@code CacheClientNotifier} works on the server and manages client socket connections
* to clients requesting notification of updates and notifies them when updates occur.
*
* @since GemFire 3.2
*/
public class CacheClientNotifier {
private static final Logger logger = LogService.getLogger();
private static final Logger secureLogger = LogService.getLogger(SECURITY_LOGGER_NAME);
@MakeNotStatic
private static volatile CacheClientNotifier ccnSingleton;
private final SocketMessageWriter socketMessageWriter;
private final ClientRegistrationEventQueueManager clientRegistrationEventQueueManager;
private final CacheClientProxyFactory cacheClientProxyFactory;
@VisibleForTesting
static CacheClientNotifier getInstance(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
int maximumMessageCount,
int messageTimeToLive,
ConnectionListener listener,
OverflowAttributes overflowAttributes,
boolean isGatewayReceiver,
SocketMessageWriter socketMessageWriter) {
if (ccnSingleton == null) {
ccnSingleton = new CacheClientNotifier(cache, clientRegistrationEventQueueManager,
statisticsClock, acceptorStats, maximumMessageCount, messageTimeToLive, listener,
isGatewayReceiver, new CacheClientProxyFactory(), socketMessageWriter);
}
if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
// Gateway receiver might have create CCN instance without HaContainer
// In this case, the HaContainer should be lazily created here
ccnSingleton.initHaContainer(overflowAttributes);
}
return ccnSingleton;
}
/**
* Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
*
* @param cache The GemFire {@code InternalCache}
* @param clientRegistrationEventQueueManager Manages temporary registration queues for clients
* @return A {@code CacheClientNotifier} instance
*/
public static synchronized CacheClientNotifier getInstance(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
int maximumMessageCount,
int messageTimeToLive,
ConnectionListener listener,
OverflowAttributes overflowAttributes,
boolean isGatewayReceiver) {
return getInstance(cache, clientRegistrationEventQueueManager, statisticsClock,
acceptorStats, maximumMessageCount, messageTimeToLive, listener, overflowAttributes,
isGatewayReceiver, new SocketMessageWriter());
}
public static CacheClientNotifier getInstance() {
return ccnSingleton;
}
@VisibleForTesting
public static void resetInstance() {
ccnSingleton = null;
}
/**
* Registers a new client updater that wants to receive updates with this server.
*
* @param socket The socket over which the server communicates with the client.
* @param isPrimary Whether server is the primary subscription end point for this client
* @param acceptorId ID of the acceptor used to clean up the client connection
* @param notifyBySubscription Whether the Server is running in NotifyBySubscription mode
* @throws IOException Can occur if there are issues communicating over the socket
*/
public void registerClient(final ClientRegistrationMetadata clientRegistrationMetadata,
final Socket socket, final boolean isPrimary, final long acceptorId,
final boolean notifyBySubscription) throws IOException {
long startTime = statistics.startTime();
ClientProxyMembershipID clientProxyMembershipID =
clientRegistrationMetadata.getClientProxyMembershipID();
DataOutputStream dataOutputStream = clientRegistrationMetadata.getDataOutputStream();
KnownVersion clientVersion = clientRegistrationMetadata.getClientVersion();
try {
if (isClientPermitted(clientRegistrationMetadata, clientProxyMembershipID)) {
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
new ConcurrentLinkedQueue<>(),
new ReentrantReadWriteLock());
try {
registerClientInternal(clientRegistrationMetadata, socket, isPrimary, acceptorId,
notifyBySubscription);
} finally {
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, this);
}
}
} catch (AuthenticationRequiredException ex) {
handleAuthenticationException(clientProxyMembershipID, dataOutputStream, clientVersion, ex,
Handshake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED);
} catch (AuthenticationFailedException | AuthenticationExpiredException ex) {
handleAuthenticationException(clientProxyMembershipID, dataOutputStream, clientVersion, ex,
Handshake.REPLY_EXCEPTION_AUTHENTICATION_FAILED);
} catch (CacheException e) {
logger.warn(
String.format("%s :registerClient: Exception encountered in registration %s", this, e),
e);
throw new IOException(String.format(
"Exception occurred while trying to register interest due to : %s", e.getMessage()), e);
} catch (Exception ex) {
logger.warn(String.format("An exception was thrown for client [%s].",
clientProxyMembershipID != null ? clientProxyMembershipID : "unknown"), ex);
socketMessageWriter.writeException(dataOutputStream,
CommunicationMode.UnsuccessfulServerToClient.getModeNumber(), ex, clientVersion);
}
statistics.endClientRegistration(startTime);
}
/**
* Continues the registration of a new client that wants to receive updates with this server.
*
* @param clientRegistrationMetadata Contains registration info pertaining to the client
* @param socket The socket over which the server communicates with the client.
* @param isPrimary Whether server is the primary subscription end point for this client
* @param acceptorId ID of the acceptor used to clean up the client connection
* @param notifyBySubscription Whether the Server is running in NotifyBySubscription mode
* @throws IOException Can occur if there are issues communicating over the socket
* @throws CacheException A generic exception, which indicates a cache error has occurred.
* @throws ClassNotFoundException Thrown when the ClientProxyMembershipID class is not found
* @throws NoSuchMethodException Potentially thrown by performPostAuthorization
* @throws InvocationTargetException Potentially thrown by performPostAuthorization
* @throws IllegalAccessException Potentially thrown by performPostAuthorization
*/
void registerClientInternal(final ClientRegistrationMetadata clientRegistrationMetadata,
final Socket socket,
final boolean isPrimary,
final long acceptorId, final boolean notifyBySubscription)
throws IOException, CacheException, ClassNotFoundException, NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
ClientProxyMembershipID clientProxyMembershipID =
clientRegistrationMetadata.getClientProxyMembershipID();
byte clientConflation = clientRegistrationMetadata.getClientConflation();
KnownVersion clientVersion = clientRegistrationMetadata.getClientVersion();
CacheClientProxy cacheClientProxy = getClientProxy(clientProxyMembershipID);
DistributedMember member = clientProxyMembershipID.getDistributedMember();
DistributedSystem system = getCache().getDistributedSystem();
Properties sysProps = system.getProperties();
String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR);
// in multi-user case, this would return null
Object subjectOrPrincipal =
getSubjectOrPrincipal(clientRegistrationMetadata, member, system, authenticator);
Subject subject = subjectOrPrincipal instanceof Subject ? (Subject) subjectOrPrincipal : null;
// Initialize the socket
socket.setTcpNoDelay(true);
socket.setSendBufferSize(CacheClientNotifier.socketBufferSize);
socket.setReceiveBufferSize(CacheClientNotifier.socketBufferSize);
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: Initialized server-to-client socket with send buffer size: {} bytes and receive buffer size: {} bytes",
socket.getSendBufferSize(), socket.getReceiveBufferSize());
}
// Determine whether the client is durable or not.
byte responseByte = CommunicationMode.SuccessfulServerToClient.getModeNumber();
boolean successful = true;
boolean clientIsDurable = clientProxyMembershipID.isDurable();
if (logger.isDebugEnabled()) {
if (clientIsDurable) {
logger.debug("CacheClientNotifier: Attempting to register durable client: {}",
clientProxyMembershipID.getDurableId());
} else {
logger.debug("CacheClientNotifier: Attempting to register non-durable client");
}
}
byte endpointType = 0x00;
int queueSize = 0;
String unsuccessfulMsg = null;
if (clientIsDurable) {
if (cacheClientProxy == null) {
if (isTimedOut(clientProxyMembershipID)) {
queueSize = PoolImpl.PRIMARY_QUEUE_TIMED_OUT;
} else {
queueSize = PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE;
}
// No proxy exists for this durable client. It must be created.
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
clientProxyMembershipID.getDurableId());
}
cacheClientProxy =
cacheClientProxyFactory.create(this, socket, clientProxyMembershipID, isPrimary,
clientConflation, clientVersion, acceptorId, notifyBySubscription,
cache.getSecurityService(), subject, statisticsClock);
successful = initializeProxy(cacheClientProxy);
} else {
cacheClientProxy.setSubject(subject);
if (cacheClientProxy.isPrimary()) {
endpointType = (byte) 2;
} else {
endpointType = (byte) 1;
}
queueSize = cacheClientProxy.getQueueSize();
// A proxy exists for this durable client. It must be reinitialized.
if (cacheClientProxy.isPaused()) {
if (CacheClientProxy.testHook != null) {
CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
}
if (cacheClientProxy.lockDrain()) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}",
clientProxyMembershipID.getDurableId(), cacheClientProxy);
}
statistics.incDurableReconnectionCount();
cacheClientProxy.getProxyID()
.updateDurableTimeout(clientProxyMembershipID.getDurableTimeout());
cacheClientProxy.reinitialize(socket, clientProxyMembershipID,
isPrimary, clientConflation,
clientVersion);
cacheClientProxy.setMarkerEnqueued(true);
if (CacheClientProxy.testHook != null) {
CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
}
} finally {
cacheClientProxy.unlockDrain();
}
} else {
unsuccessfulMsg =
"CacheClientNotifier: Connection refused due to cq queue being drained from admin command, please wait...";
logger.warn(unsuccessfulMsg);
responseByte = Handshake.REPLY_REFUSED;
if (CacheClientProxy.testHook != null) {
CacheClientProxy.testHook.doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED");
}
}
} else {
// The existing proxy is already running (which means that another
// client is already using this durable id.
unsuccessfulMsg =
String.format(
"The requested durable client has the same identifier ( %s ) as an existing durable client ( %s ). Duplicate durable clients are not allowed.",
clientProxyMembershipID.getDurableId(), cacheClientProxy);
logger.warn(unsuccessfulMsg);
// Set the unsuccessful response byte.
responseByte = Handshake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT;
}
}
} else {
CacheClientProxy staleClientProxy = getClientProxy(clientProxyMembershipID);
if (staleClientProxy != null) {
// A proxy exists for this non-durable client. It must be closed.
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: A proxy exists for this non-durable client. It must be closed.");
}
if (staleClientProxy.startRemoval()) {
staleClientProxy.waitRemoval();
} else {
staleClientProxy.close(false, false); // do not check for queue, just close it
removeClientProxy(staleClientProxy); // remove old proxy from proxy set
}
} // non-null stale proxy
// Create the new proxy for this non-durable client
cacheClientProxy =
new CacheClientProxy(this, socket, clientProxyMembershipID, isPrimary, clientConflation,
clientVersion, acceptorId, notifyBySubscription, cache.getSecurityService(), subject,
statisticsClock);
successful = initializeProxy(cacheClientProxy);
}
if (!successful) {
cacheClientProxy = null;
responseByte = Handshake.REPLY_REFUSED;
unsuccessfulMsg =
String.format(
"A previous connection attempt from this client is still being processed: %s",
clientProxyMembershipID);
logger.warn(unsuccessfulMsg);
} else if (subject != null) {
secureLogger.debug("CacheClientProxy {} using subject {} {} ", cacheClientProxy,
subject.getPrincipal(), subject);
}
// Tell the client that the proxy has been registered using the response
// byte. This byte will be read on the client by the CacheClientUpdater to
// determine whether the registration was successful. The times when
// registration is unsuccessful currently are if a duplicate durable client
// is attempted to be registered or authentication fails.
try {
DataOutputStream dos =
new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
// write the message type, message length and the error message (if any)
socketMessageWriter.writeHandshakeMessage(dos, responseByte, unsuccessfulMsg, clientVersion,
endpointType, queueSize);
} catch (IOException ioe) {// remove the added proxy if we get IOException.
if (cacheClientProxy != null) {
// do not check for queue, just close it
boolean keepProxy = cacheClientProxy.close(false, false);
if (!keepProxy) {
removeClientProxy(cacheClientProxy);
}
}
throw ioe;
}
if (unsuccessfulMsg != null && logger.isDebugEnabled()) {
logger.debug(unsuccessfulMsg);
}
// If the client is not durable, start its message processor
// Starting it here (instead of in the CacheClientProxy constructor)
// will ensure that the response byte is sent to the client before
// the marker message. If the client is durable, the message processor
// is not started until the clientReady message is received.
if (!clientIsDurable && cacheClientProxy != null
&& responseByte == CommunicationMode.SuccessfulServerToClient.getModeNumber()) {
// The startOrResumeMessageDispatcher tests if the proxy is a primary.
// If this is a secondary proxy, the dispatcher is not started.
// The false parameter signifies that a marker message has not already been
// processed. This will generate and send one.
cacheClientProxy.startOrResumeMessageDispatcher(false);
}
if (responseByte == CommunicationMode.SuccessfulServerToClient.getModeNumber()) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Successfully registered {}", cacheClientProxy);
}
performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
sysProps,
subjectOrPrincipal);
} else {
try {
// prevent leak by closing socket
socket.close();
} catch (IOException ignore) {
}
logger.warn(
"CacheClientNotifier: Unsuccessfully registered client with identifier {} and response code {}",
new Object[] {clientProxyMembershipID, responseByte});
}
}
private void handleAuthenticationException(final ClientProxyMembershipID clientProxyMembershipID,
final DataOutputStream dataOutputStream, final KnownVersion clientVersion,
final GemFireSecurityException ex, final byte replyExceptionAuthenticationFailed)
throws IOException {
securityLogWriter.warning(
String.format("An exception was thrown for client [%s]. %s",
clientProxyMembershipID, ex));
socketMessageWriter.writeException(dataOutputStream, replyExceptionAuthenticationFailed, ex,
clientVersion);
}
private boolean initializeProxy(CacheClientProxy l_proxy) throws CacheException {
if (!isProxyInInitializationMode(l_proxy)) {
if (logger.isDebugEnabled()) {
logger.debug("Initializing proxy: {}", l_proxy);
}
try {
// Add client proxy to initialization list. This has to be done before
// the queue is created so that events can be buffered here for delivery
// to the queue once it's initialized (bug #41681 and others)
addClientInitProxy(l_proxy);
l_proxy.initializeMessageDispatcher();
// Initialization success. Add to client proxy list.
addClientProxy(l_proxy);
return true;
} catch (RegionExistsException ree) {
if (logger.isDebugEnabled()) {
String name = ree.getRegion() != null ? ree.getRegion().getFullPath() : "null region";
logger.debug("Found RegionExistsException while initializing proxy. Region name: {}",
name);
}
// This will return false;
} finally {
removeClientInitProxy(l_proxy);
}
}
return false;
}
/**
* Makes Primary to this CacheClientProxy and start the dispatcher of the CacheClientProxy
*
* @param isClientReady Whether the marker has already been processed. This value helps determine
* whether to start the dispatcher.
*/
public void makePrimary(ClientProxyMembershipID proxyId, boolean isClientReady) {
CacheClientProxy proxy = getClientProxy(proxyId);
if (proxy == null) {
throw new InternalGemFireError("No cache client proxy on this node for proxyId " + proxyId);
}
proxy.setPrimary(true);
/*
* If the client represented by this proxy has: - already processed the marker message
* (meaning the client is failing over to this server as its primary) <or> - is not durable
* (meaning the marker message is being processed automatically
*
* Then, start or resume the dispatcher. Otherwise, let the clientReady message start the
* dispatcher. See CacheClientProxy.startOrResumeMessageDispatcher if
* (!proxy._messageDispatcher.isAlive()) {
*/
if (!proxy.isDurable()) {
logger.debug("CacheClientNotifier: Notifying non-durable proxy to start dispatcher for: {}",
proxy);
proxy.startOrResumeMessageDispatcher(false);
} else if (isClientReady) {
logger.debug("CacheClientNotifier: Notifying durable proxy to start dispatcher for: {}",
proxy);
// if durable client is failing over to this server as its primary, the marker message may or
// may not have been delivered to the client yet. If we put the marker on the queue,
// the marker may be pushed all the way at the end of the queue resulting in the update events
// being not processed normally by the client. So we will need to send the marker directly
// to the client instead of enqueuing it.
proxy.startOrResumeMessageDispatcher(true);
}
}
/**
* Adds or updates entry in the dispatched message map when client sends an ack.
*/
public boolean processDispatchedMessage(ClientProxyMembershipID proxyId, EventID eid) {
boolean success = false;
CacheClientProxy proxy = getClientProxy(proxyId);
if (proxy != null) {
HARegionQueue harq = proxy.getHARegionQueue();
harq.addDispatchedMessage(new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()),
eid.getSequenceID());
success = true;
}
return success;
}
/**
* Sets keepalive on the proxy of the given membershipID
*
* @param membershipID Uniquely identifies the client pool
* @since GemFire 5.7
*/
public void setKeepAlive(ClientProxyMembershipID membershipID, boolean keepAlive) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: setKeepAlive client: {}", membershipID);
}
CacheClientProxy proxy = getClientProxy(membershipID);
if (proxy != null) {
proxy.setKeepAlive(keepAlive);
}
}
/**
* Unregisters an existing client from this server.
*
* @param memberId Uniquely identifies the client
*/
void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
}
CacheClientProxy proxy = getClientProxy(memberId);
if (proxy != null) {
final boolean isTraceEnabled = logger.isTraceEnabled();
if (isTraceEnabled) {
logger.trace("CacheClientNotifier: Potential client: {}", proxy);
}
// If the proxy's member id is the same as the input member id, add
// it to the set of dead proxies.
if (!proxy.startRemoval()) {
if (isTraceEnabled) {
logger.trace("CacheClientNotifier: Potential client: {} matches {}", proxy, memberId);
}
closeDeadProxies(Collections.singletonList(proxy), normalShutdown);
}
}
}
/**
* The client represented by the proxyId is ready to receive updates.
*/
public void readyForEvents(ClientProxyMembershipID proxyId) {
CacheClientProxy proxy = getClientProxy(proxyId);
if (proxy != null) {
// False signifies that a marker message has not already been processed.
// Generate and send one.
proxy.startOrResumeMessageDispatcher(false);
}
}
ClientUpdateMessageImpl constructClientMessage(InternalCacheEvent event) {
ClientUpdateMessageImpl clientMessage = null;
EnumListenerEvent operation = event.getEventType();
try {
clientMessage = initializeMessage(operation, event);
} catch (Exception e) {
logger.fatal(String.format(
"CacheClientNotifier: Cannot notify clients to perform operation %s on event %s",
operation, event),
e);
}
return clientMessage;
}
/**
* notify interested clients of the given cache event. The event should have routing information
* in it that determines which clients will receive the event.
*/
public static void notifyClients(InternalCacheEvent event) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonNotifyClients(event, null);
}
}
public static boolean singletonHasClientProxies() {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
return instance.hasClientProxies();
}
return false;
}
private boolean hasClientProxies() {
return !_initClientProxies.isEmpty() || !_clientProxies.isEmpty();
}
/**
* notify interested clients of the given cache event using the given update message. The event
* should have routing information in it that determines which clients will receive the event.
*/
public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonNotifyClients(event, cmsg);
}
}
private void singletonNotifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
if (!hasClientProxies()) {
return;
}
FilterInfo filterInfo = event.getLocalFilterInfo();
if (filterInfo != null) {
// if the routing was made using an old profile we need to recompute it
if (logger.isTraceEnabled()) {
logger.trace("Event isOriginRemote={}", event.isOriginRemote());
}
}
if (filterInfo == null
|| filterInfo.getCQs() == null && filterInfo.getInterestedClients() == null
&& filterInfo.getInterestedClientsInv() == null) {
return;
}
long startTime = statistics.startTime();
ClientUpdateMessageImpl clientMessage;
if (cmsg == null) {
clientMessage = constructClientMessage(event);
} else {
clientMessage = (ClientUpdateMessageImpl) cmsg;
}
if (clientMessage == null) {
return;
}
FilterProfile regionProfile = ((InternalRegion) event.getRegion()).getFilterProfile();
Set<ClientProxyMembershipID> filterClients =
getFilterClientIDs(event, regionProfile, filterInfo, clientMessage);
final Conflatable conflatable;
if (clientMessage instanceof ClientTombstoneMessage) {
// HAEventWrapper deserialization can't handle subclasses of ClientUpdateMessageImpl, so don't
// wrap them
conflatable = clientMessage;
} else {
HAEventWrapper wrapper = new HAEventWrapper(clientMessage);
wrapper.incrementPutInProgressCounter("notify clients");
conflatable = wrapper;
// include new value in event if the entry is not a tombstone and there are clients
if (!filterClients.isEmpty() && event.getOperation().isEntry()) {
EntryEventImpl entryEvent = (EntryEventImpl) event;
entryEvent.exportNewValue(clientMessage);
}
}
// add event to temporary queue for clients in process of registering (if any)
clientRegistrationEventQueueManager.add(event, clientMessage, conflatable, filterClients, this);
singletonRouteClientMessage(conflatable, filterClients);
statistics.endEvent(startTime);
// Cleanup destroyed events in CQ result cache.
// While maintaining the CQ results key caching. the destroy event
// keys are marked as destroyed instead of removing them, this is
// to take care, arrival of duplicate events. The key marked as
// destroyed are removed after the event is placed in clients HAQueue.
if (filterInfo.filterProcessedLocally) {
removeDestroyTokensFromCqResultKeys(event, filterInfo);
}
}
Set<ClientProxyMembershipID> getFilterClientIDs(final InternalCacheEvent event,
final FilterProfile regionProfile,
final FilterInfo filterInfo,
final ClientUpdateMessageImpl clientMessage) {
// Holds the clientIds to which filter message needs to be sent.
Set<ClientProxyMembershipID> filterClients = new HashSet<>();
// Add CQ info.
if (filterInfo.getCQs() != null) {
for (Map.Entry<Long, MessageType> e : filterInfo.getCQs().entrySet()) {
Long cqID = e.getKey();
String cqName = regionProfile.getRealCqID(cqID);
if (cqName == null) {
continue;
}
ServerCQ cq = regionProfile.getCq(cqName);
if (cq != null) {
ClientProxyMembershipID id = cq.getClientProxyId();
filterClients.add(id);
if (logger.isDebugEnabled()) {
logger.debug("Adding cq routing info to message for id: {} and cq: {}", id, cqName);
}
clientMessage.addClientCq(id, cq.getName(), e.getValue());
}
}
}
// Add interestList info.
if (filterInfo.getInterestedClientsInv() != null) {
Set<Object> rawIDs =
uncheckedCast(regionProfile.getRealClientIDs(filterInfo.getInterestedClientsInv()));
Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs);
incMessagesNotQueuedOriginatorStat(event, ids);
if (!ids.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("adding invalidation routing to message for {}", ids);
}
clientMessage.addClientInterestList(ids, false);
filterClients.addAll(ids);
}
}
if (filterInfo.getInterestedClients() != null) {
Set<Object> rawIDs =
uncheckedCast(regionProfile.getRealClientIDs(filterInfo.getInterestedClients()));
Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs);
incMessagesNotQueuedOriginatorStat(event, ids);
if (!ids.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("adding routing to message for {}", ids);
}
clientMessage.addClientInterestList(ids, true);
filterClients.addAll(ids);
}
}
return filterClients;
}
private boolean isClientPermitted(ClientRegistrationMetadata clientRegistrationMetadata,
ClientProxyMembershipID clientProxyMembershipID) throws IOException {
if (getDenylistedClient().contains(clientProxyMembershipID)) {
Exception deniedException = new Exception("This client is denylisted by server");
socketMessageWriter.writeException(clientRegistrationMetadata.getDataOutputStream(),
Handshake.REPLY_INVALID, deniedException, clientRegistrationMetadata.getClientVersion());
return false;
}
return true;
}
private void incMessagesNotQueuedOriginatorStat(final InternalCacheEvent event,
final Set<ClientProxyMembershipID> ids) {
// don't send to member of origin
ClientProxyMembershipID eventOriginator = event.getContext();
if (eventOriginator != null) {
if (ids.remove(eventOriginator)) {
CacheClientProxy ccp = getClientProxy(eventOriginator);
if (ccp != null) {
ccp.getStatistics().incMessagesNotQueuedOriginator();
}
}
}
}
private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event,
FilterInfo filterInfo) {
FilterProfile regionProfile = ((InternalRegion) event.getRegion()).getFilterProfile();
if (event.getOperation().isEntry() && filterInfo.getCQs() != null) {
EntryEvent<?, ?> entryEvent = (EntryEvent<?, ?>) event;
for (Map.Entry<Long, MessageType> e : filterInfo.getCQs().entrySet()) {
Long cqID = e.getKey();
String cqName = regionProfile.getRealCqID(cqID);
if (cqName != null) {
ServerCQ cq = regionProfile.getCq(cqName);
if (cq != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
cq.removeFromCqResultKeys(entryEvent.getKey(), true);
}
}
}
}
}
private void performPostAuthorization(final CacheClientProxy proxy,
final ClientProxyMembershipID clientProxyMembershipID, final DistributedMember member,
final Properties sysProps, final Object subjectOrPrincipal)
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
InvocationTargetException {
if (proxy != null && subjectOrPrincipal != null) {
if (subjectOrPrincipal instanceof Principal) {
Principal principal = (Principal) subjectOrPrincipal;
if (securityLogWriter.fineEnabled()) {
securityLogWriter
.fine("CacheClientNotifier: successfully verified credentials for proxyID: "
+ clientProxyMembershipID
+ " having principal: " + principal.getName());
}
String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
AccessControl authzCallback = null;
if (postAuthzFactoryName != null && !postAuthzFactoryName.isEmpty()) {
Method authzMethod = ClassLoadUtils.methodFromName(postAuthzFactoryName);
authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
authzCallback.init(principal, member, getCache());
}
proxy.setPostAuthzCallback(authzCallback);
}
}
}
private Object getSubjectOrPrincipal(final ClientRegistrationMetadata clientRegistrationMetadata,
final DistributedMember member, final DistributedSystem system, final String authenticator) {
final Object subjectOrPrincipal;
// in multi-user case, this would be null
if (clientRegistrationMetadata.getClientCredentials() != null) {
secureLogger.debug("CacheClientNotifier: verifying credentials for proxyID: "
+ clientRegistrationMetadata.getClientProxyMembershipID());
subjectOrPrincipal = Handshake
.verifyCredentials(authenticator,
clientRegistrationMetadata.getClientCredentials(),
system.getSecurityProperties(),
logWriter, securityLogWriter, member,
cache.getSecurityService());
} else {
subjectOrPrincipal = null;
}
return subjectOrPrincipal;
}
/**
* delivers the given message to all proxies for routing. The message should already have client
* interest established, or override the isClientInterested method to implement its own routing
*/
public static void routeClientMessage(Conflatable clientMessage) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
// ok to use keySet here because all we do is call getClientProxy with these keys
instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet());
}
}
/**
* this is for server side registration of client queue
*/
static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
ClientProxyMembershipID clientProxyMembershipId) {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonRouteClientMessage(clientMessage,
Collections.singleton(clientProxyMembershipId));
}
}
private void singletonRouteClientMessage(Conflatable conflatable,
Collection<ClientProxyMembershipID> filterClients) {
cache.getCancelCriterion().checkCancelInProgress(null);
List<CacheClientProxy> deadProxies = null;
for (ClientProxyMembershipID clientId : filterClients) {
CacheClientProxy proxy;
proxy = getClientProxy(clientId, true);
if (proxy != null) {
if (proxy.isAlive() || proxy.isPaused() || proxy.isConnected() || proxy.isDurable()) {
proxy.deliverMessage(conflatable);
} else {
proxy.getStatistics().incMessagesFailedQueued();
if (deadProxies == null) {
deadProxies = new ArrayList<>();
}
deadProxies.add(proxy);
}
denyListSlowReceiver(proxy);
}
}
if (conflatable instanceof HAEventWrapper) {
((HAEventWrapper) conflatable).decrementPutInProgressCounter();
}
// Remove any dead clients from the clients to notify
if (deadProxies != null) {
closeDeadProxies(deadProxies, false);
}
}
/**
* processes the given collection of durable and non-durable client identifiers, returning a
* collection of non-durable identifiers of clients connected to this VM
*/
Set<ClientProxyMembershipID> getProxyIDs(Set<?> mixedDurableAndNonDurableIDs) {
Set<ClientProxyMembershipID> result = ConcurrentHashMap.newKeySet();
for (Object id : mixedDurableAndNonDurableIDs) {
if (id instanceof String) {
CacheClientProxy clientProxy = getClientProxy((String) id, true);
if (clientProxy != null) {
result.add(clientProxy.getProxyID());
}
// else { we don't have a proxy for the given durable ID }
} else {
// try to canonicalize the ID.
CacheClientProxy proxy = getClientProxy((ClientProxyMembershipID) id, true);
if (proxy != null) {
result.add(proxy.getProxyID());
}
}
}
return result;
}
private void denyListSlowReceiver(CacheClientProxy clientProxy) {
final CacheClientProxy proxy = clientProxy;
if (proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReceiver()
&& !denyListedClients.contains(proxy.getProxyID())) {
// log alert with client info.
logger.warn("Client {} is a slow receiver.",
new Object[] {proxy.getProxyID()});
addToDenylistedClient(proxy.getProxyID());
InternalDistributedSystem ids =
(InternalDistributedSystem) getCache().getDistributedSystem();
final DistributionManager dm = ids.getDistributionManager();
dm.getExecutors().getWaitingThreadPool().execute(() -> {
CacheDistributionAdvisor advisor =
proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor();
Set<InternalDistributedMember> members = advisor.adviseCacheOp();
// Send client denylist message
ClientDenylistProcessor.sendDenylistedClient(proxy.getProxyID(), dm, members);
// close the proxy for slow receiver.
proxy.close(false, false);
removeClientProxy(proxy);
if (PoolImpl.AFTER_QUEUE_DESTROY_MESSAGE_FLAG) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.afterQueueDestroyMessage();
}
// send remove from denylist.
RemoveClientFromDenylistMessage rcm = new RemoveClientFromDenylistMessage();
rcm.setProxyID(proxy.getProxyID());
dm.putOutgoing(rcm);
denyListedClients.remove(proxy.getProxyID());
});
}
}
/**
* Initializes a {@code ClientUpdateMessage} from an operation and event
*
* @param operation The operation that occurred (e.g. AFTER_CREATE)
* @param event The event containing the data to be updated
* @return a {@code ClientUpdateMessage}
*/
private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation,
CacheEvent<?, ?> event)
throws Exception {
if (!supportsOperation(operation)) {
throw new Exception(
String.format("The cache client notifier does not support operations of type %s",
operation));
}
Object keyOfInterest = null;
final EventID eventIdentifier;
ClientProxyMembershipID membershipID = null;
boolean isNetLoad = false;
Object callbackArgument;
byte[] delta = null;
VersionTag<?> versionTag = null;
if (event.getOperation().isEntry()) {
EntryEventImpl entryEvent = (EntryEventImpl) event;
versionTag = entryEvent.getVersionTag();
delta = entryEvent.getDeltaBytes();
callbackArgument = entryEvent.getRawCallbackArgument();
if (entryEvent.isBridgeEvent()) {
membershipID = entryEvent.getContext();
}
keyOfInterest = entryEvent.getKey();
eventIdentifier = entryEvent.getEventId();
isNetLoad = entryEvent.isNetLoad();
} else {
RegionEventImpl regionEvent = (RegionEventImpl) event;
callbackArgument = regionEvent.getRawCallbackArgument();
eventIdentifier = regionEvent.getEventId();
if (event instanceof ClientRegionEventImpl) {
ClientRegionEventImpl bridgeEvent = (ClientRegionEventImpl) event;
membershipID = bridgeEvent.getContext();
}
}
// NOTE: If delta is non-null, value MUST be in Object form of type Delta.
ClientUpdateMessageImpl clientUpdateMsg =
new ClientUpdateMessageImpl(operation, (InternalRegion) event.getRegion(), keyOfInterest,
null,
delta, (byte) 0x01, callbackArgument, membershipID, eventIdentifier, versionTag);
if (isNetLoad) {
clientUpdateMsg.setIsNetLoad(isNetLoad);
}
return clientUpdateMsg;
}
/**
* Returns whether the {@code CacheClientNotifier} supports the input operation.
*
* @param operation The operation that occurred (e.g. AFTER_CREATE)
* @return whether the {@code CacheClientNotifier} supports the input operation
*/
private boolean supportsOperation(EnumListenerEvent operation) {
return operation == EnumListenerEvent.AFTER_CREATE
|| operation == EnumListenerEvent.AFTER_UPDATE
|| operation == EnumListenerEvent.AFTER_DESTROY
|| operation == EnumListenerEvent.AFTER_INVALIDATE
|| operation == EnumListenerEvent.AFTER_REGION_DESTROY
|| operation == EnumListenerEvent.AFTER_REGION_CLEAR
|| operation == EnumListenerEvent.AFTER_REGION_INVALIDATE;
}
/**
* Registers client interest in the input region and key.
*
* @param regionName The name of the region of interest
* @param keyOfInterest The name of the key of interest
* @param membershipID clients ID
* @param interestType type of registration
* @param isDurable whether the registration persists when client goes away
* @param sendUpdatesAsInvalidates client wants invalidation messages
* @param manageEmptyRegions whether to book keep empty region information
* @param regionDataPolicy region data policy
*/
public void registerClientInterest(final @NotNull String regionName,
final @NotNull Object keyOfInterest,
final @NotNull ClientProxyMembershipID membershipID,
final @NotNull InterestType interestType,
final boolean isDurable,
final boolean sendUpdatesAsInvalidates,
final boolean manageEmptyRegions,
final @NotNull DataPolicy regionDataPolicy,
final boolean flushState)
throws IOException, RegionDestroyedException {
CacheClientProxy proxy = getClientProxy(membershipID, true);
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: Client {} registering interest in: {} -> {} (an instance of {})",
proxy, regionName, keyOfInterest, keyOfInterest.getClass().getName());
}
if (proxy == null) {
// client should see this and initiates failover
throw new IOException(
"CacheClientProxy for this client is no longer on the server , so registerInterest operation is unsuccessful");
}
boolean done = false;
try {
proxy.registerClientInterest(regionName, keyOfInterest, interestType, isDurable,
sendUpdatesAsInvalidates, flushState);
if (manageEmptyRegions) {
updateMapOfEmptyRegions(proxy.getRegionsWithEmptyDataPolicy(), regionName,
regionDataPolicy);
}
done = true;
} finally {
if (!done) {
proxy.unregisterClientInterest(regionName, keyOfInterest, interestType, false);
}
}
}
/**
* Store region and delta relation
*
* @param regionDataPolicy (0==empty)
* @since GemFire 6.1
*/
public void updateMapOfEmptyRegions(
final @NotNull Map<String, Integer> regionsWithEmptyDataPolicy,
final @NotNull String regionName,
final @Nullable DataPolicy regionDataPolicy) {
if (regionDataPolicy == DataPolicy.EMPTY) {
if (!regionsWithEmptyDataPolicy.containsKey(regionName)) {
regionsWithEmptyDataPolicy.put(regionName, 0);
}
}
}
/**
* Unregisters client interest in the input region and key.
*
* @param regionName The name of the region of interest
* @param keyOfInterest The name of the key of interest
* @param isClosing Whether the caller is closing
* @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested
* in this {@code Region} and key
*/
public void unregisterClientInterest(String regionName, Object keyOfInterest,
final @NotNull InterestType interestType,
boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) {
if (logger.isDebugEnabled()) {
logger.debug(
"CacheClientNotifier: Client {} unregistering interest in: {} -> {} (an instance of {})",
membershipID, regionName, keyOfInterest, keyOfInterest.getClass().getName());
}
CacheClientProxy proxy = getClientProxy(membershipID);
if (proxy != null) {
proxy.setKeepAlive(keepalive);
proxy.unregisterClientInterest(regionName, keyOfInterest, interestType, isClosing);
}
}
/**
* Registers client interest in the input region and list of keys.
*
* @param regionName The name of the region of interest
* @param keysOfInterest The list of keys of interest
* @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested
* in this {@code Region} and key
*/
public void registerClientInterest(final @NotNull String regionName,
final @NotNull List<?> keysOfInterest,
final @NotNull ClientProxyMembershipID membershipID,
final boolean isDurable,
final boolean sendUpdatesAsInvalidates,
final boolean manageEmptyRegions,
final @NotNull DataPolicy regionDataPolicy,
final boolean flushState)
throws IOException, RegionDestroyedException {
CacheClientProxy proxy = getClientProxy(membershipID, true);
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Client {} registering interest in: {} -> {}", proxy,
regionName, keysOfInterest);
}
if (proxy == null) {
throw new IOException(
"CacheClientProxy for this client is no longer on the server , so registerInterest operation is unsuccessful");
}
proxy.registerClientInterestList(regionName, keysOfInterest, isDurable,
sendUpdatesAsInvalidates, flushState);
if (manageEmptyRegions) {
updateMapOfEmptyRegions(proxy.getRegionsWithEmptyDataPolicy(), regionName, regionDataPolicy);
}
}
/**
* Unregisters client interest in the input region and list of keys.
*
* @param regionName The name of the region of interest
* @param keysOfInterest The list of keys of interest
* @param isClosing Whether the caller is closing
* @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested
* in this {@code Region} and key
*/
public void unregisterClientInterest(String regionName, List<?> keysOfInterest, boolean isClosing,
ClientProxyMembershipID membershipID, boolean keepalive) {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Client {} unregistering interest in: {} -> {}",
membershipID, regionName, keysOfInterest);
}
CacheClientProxy proxy = getClientProxy(membershipID);
if (proxy != null) {
proxy.setKeepAlive(keepalive);
proxy.unregisterClientInterest(regionName, keysOfInterest, isClosing);
}
}
/**
* Returns the {@code CacheClientProxy} associated to the membershipID *
*
* @return the {@code CacheClientProxy} associated to the membershipID
*/
public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) {
return _clientProxies.get(membershipID);
}
/**
* Returns the CacheClientProxy associated to the membershipID. This looks at both proxies that
* are initialized and those that are still in initialization mode.
*/
public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID,
boolean proxyInInitMode) {
CacheClientProxy proxy = getClientProxy(membershipID);
if (proxyInInitMode && proxy == null) {
proxy = _initClientProxies.get(membershipID);
}
return proxy;
}
/**
* Returns the {@code CacheClientProxy} associated to the durableClientId
*
* @return the {@code CacheClientProxy} associated to the durableClientId
*/
public CacheClientProxy getClientProxy(String durableClientId) {
return getClientProxy(durableClientId, false);
}
/**
* Returns the {@code CacheClientProxy} associated to the durableClientId. This version of
* the method can check for initializing proxies as well as fully initialized proxies.
*
* @return the {@code CacheClientProxy} associated to the durableClientId
*/
public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
final boolean isDebugEnabled = logger.isDebugEnabled();
final boolean isTraceEnabled = logger.isTraceEnabled();
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Determining client for {}", durableClientId);
}
CacheClientProxy proxy = null;
for (CacheClientProxy clientProxy : getClientProxies()) {
if (isTraceEnabled) {
logger.trace("CacheClientNotifier: Checking client {}", clientProxy);
}
if (clientProxy.getDurableId().equals(durableClientId)) {
proxy = clientProxy;
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: {} represents the durable client {}", proxy,
durableClientId);
}
break;
}
}
if (proxy == null && proxyInInitMode) {
for (final CacheClientProxy clientProxy : _initClientProxies.values()) {
if (isTraceEnabled) {
logger.trace("CacheClientNotifier: Checking initializing client {}", clientProxy);
}
if (clientProxy.getDurableId().equals(durableClientId)) {
proxy = clientProxy;
if (isDebugEnabled) {
logger.debug(
"CacheClientNotifier: initializing client {} represents the durable client {}",
proxy, durableClientId);
}
break;
}
}
}
return proxy;
}
/**
* It will remove the clients connected to the passed acceptorId. If its the only server, shuts
* down this instance.
*/
public synchronized void shutdown(long acceptorId) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
getCache().getCacheServers().size());
}
Iterator<CacheClientProxy> it = _clientProxies.values().iterator();
// Close all the client proxies
while (it.hasNext()) {
CacheClientProxy proxy = it.next();
if (proxy.getAcceptorId() != acceptorId) {
continue;
}
it.remove();
try {
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Closing {}", proxy);
}
proxy.terminateDispatching(true);
} catch (Exception e) {
if (isDebugEnabled) {
logger.debug("{}: Exception in closing down the CacheClientProxy", this, e);
}
}
}
if (noActiveServer() && ccnSingleton != null) {
ccnSingleton = null;
if (haContainer != null) {
haContainer.cleanUp();
if (isDebugEnabled) {
logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
}
}
clearCompiledQueries();
denyListedClients.clear();
// cancel the ping task
clientPingTask.cancel();
// Close the statistics
statistics.close();
socketCloser.close();
}
}
private boolean noActiveServer() {
for (CacheServer server : getCache().getCacheServers()) {
if (server.isRunning()) {
return false;
}
}
return true;
}
/**
* Adds a new {@code CacheClientProxy} to the list of known client proxies
*
* @param proxy The {@code CacheClientProxy} to add
*/
void addClientProxy(CacheClientProxy proxy) {
getCache(); // ensure cache reference is up to date so firstclient state is correct
_clientProxies.put(proxy.getProxyID(), proxy);
// Remove this proxy from the init proxy list.
removeClientInitProxy(proxy);
_connectionListener.queueAdded(proxy.getProxyID());
if (!(proxy.clientConflation == Handshake.CONFLATION_ON)) {
// Delta not supported with conflation ON
ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
/*
* If the client connection init starts while cache/member is shutting down,
* ClientHealthMonitor.getInstance() might return null.
*/
if (chm != null) {
chm.numberOfClientsWithConflationOff.incrementAndGet();
}
}
timedOutDurableClientProxies.remove(proxy.getProxyID());
}
void addClientInitProxy(CacheClientProxy proxy) {
_initClientProxies.put(proxy.getProxyID(), proxy);
}
private void removeClientInitProxy(CacheClientProxy proxy) {
_initClientProxies.remove(proxy.getProxyID());
}
private boolean isProxyInInitializationMode(CacheClientProxy proxy) {
return _initClientProxies.containsKey(proxy.getProxyID());
}
/**
* Returns (possibly stale) set of memberIds for all clients being actively notified by this
* server.
*
* @return set of memberIds
*/
public Set<ClientProxyMembershipID> getActiveClients() {
Set<ClientProxyMembershipID> clients = new HashSet<>();
for (CacheClientProxy proxy : getClientProxies()) {
if (proxy.hasRegisteredInterested()) {
ClientProxyMembershipID proxyID = proxy.getProxyID();
clients.add(proxyID);
}
}
return clients;
}
/**
* Return (possibly stale) list of all clients and their status
*
* @return Map, with CacheClientProxy as a key and CacheClientStatus as a value
*/
public Map<ClientProxyMembershipID, CacheClientStatus> getAllClients() {
Map<ClientProxyMembershipID, CacheClientStatus> clients = new HashMap<>();
for (CacheClientProxy proxy : _clientProxies.values()) {
ClientProxyMembershipID proxyID = proxy.getProxyID();
clients.put(proxyID, new CacheClientStatus(proxyID));
}
return clients;
}
/**
* Checks if there is any proxy present for the given durable client
*
* @param durableId - id for the durable-client
* @return - true if a proxy is present for the given durable client
*
* @since GemFire 5.6
*/
public boolean hasDurableClient(String durableId) {
for (CacheClientProxy proxy : _clientProxies.values()) {
ClientProxyMembershipID proxyID = proxy.getProxyID();
if (durableId.equals(proxyID.getDurableId())) {
return true;
}
}
return false;
}
/**
* Checks if there is any proxy which is primary for the given durable client
*
* @param durableId - id for the durable-client
* @return - true if a primary proxy is present for the given durable client
*
* @since GemFire 5.6
*/
public boolean hasPrimaryForDurableClient(String durableId) {
for (CacheClientProxy proxy : _clientProxies.values()) {
ClientProxyMembershipID proxyID = proxy.getProxyID();
if (durableId.equals(proxyID.getDurableId())) {
return proxy.isPrimary();
}
}
return false;
}
/**
* Returns (possibly stale) map of queue sizes for all clients notified by this server.
*
* @return map with CacheClientProxy as key, and Integer as a value
*/
public Map<ClientProxyMembershipID, Integer> getClientQueueSizes() {
Map<ClientProxyMembershipID, Integer> queueSizes = new HashMap<>();
for (CacheClientProxy proxy : _clientProxies.values()) {
queueSizes.put(proxy.getProxyID(), proxy.getQueueSize());
}
return queueSizes;
}
public int getDurableClientHAQueueSize(String durableClientId) {
CacheClientProxy ccp = getClientProxy(durableClientId);
if (ccp == null) {
return -1;
}
return ccp.getQueueSizeStat();
}
// closes the cq and drains the queue
public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
CacheClientProxy proxy = getClientProxy(durableClientId);
// close and drain
if (proxy != null) {
return proxy.closeClientCq(clientCQName);
}
return false;
}
/**
* Removes an existing {@code CacheClientProxy} from the list of known client proxies
*
* @param proxy The {@code CacheClientProxy} to remove
*/
void removeClientProxy(CacheClientProxy proxy) {
ClientProxyMembershipID client = proxy.getProxyID();
_clientProxies.remove(client);
_connectionListener.queueRemoved();
getCache().cleanupForClient(this, client);
if (!(proxy.clientConflation == Handshake.CONFLATION_ON)) {
ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
if (chm != null) {
chm.numberOfClientsWithConflationOff.decrementAndGet();
}
}
}
void durableClientTimedOut(ClientProxyMembershipID client) {
timedOutDurableClientProxies.add(client);
}
private boolean isTimedOut(ClientProxyMembershipID client) {
return timedOutDurableClientProxies.contains(client);
}
/**
* Returns an unmodifiable Collection of known {@code CacheClientProxy} instances. The
* collection is not static so its contents may change.
*
* @return the collection of known {@code CacheClientProxy} instances
*/
public Collection<CacheClientProxy> getClientProxies() {
return Collections.unmodifiableCollection(_clientProxies.values());
}
private void closeAllClientCqs(CacheClientProxy proxy) {
CqService cqService = proxy.getCache().getCqService();
if (cqService != null) {
final boolean isDebugEnabled = logger.isDebugEnabled(); // LocalizedMessage.create(
try {
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Closing client CQs: {}", proxy);
}
cqService.closeClientCqs(proxy.getProxyID());
} catch (CqException e1) {
logger.warn("Unable to close CQs for the client: {}", proxy.getProxyID());
if (isDebugEnabled) {
logger.debug(e1.getMessage(), e1);
}
}
}
}
/**
* Shuts down durable client proxy
*/
public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
CacheClientProxy ccp = getClientProxy(durableClientId);
if (ccp == null) {
return false;
}
// we can probably remove the isPaused check
if (ccp.isPaused() && !ccp.isConnected()) {
ccp.setKeepAlive(false);
closeDeadProxies(Collections.singletonList(ccp), true);
return true;
}
if (logger.isDebugEnabled()) {
logger.debug("Cannot close running durable client: {}", durableClientId);
}
// TODO: never throw an anonymous inner class
throw new CacheException("Cannot close a running durable client : " + durableClientId) {};
}
/**
* Close dead {@code CacheClientProxy} instances
*
* @param deadProxies The list of {@code CacheClientProxy} instances to close
*/
private void closeDeadProxies(List<CacheClientProxy> deadProxies, boolean stoppedNormally) {
final boolean isDebugEnabled = logger.isDebugEnabled();
for (CacheClientProxy deadProxy : deadProxies) {
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Closing dead client: {}", deadProxy);
}
// Close the proxy
boolean keepProxy = false;
try {
keepProxy = deadProxy.close(false, stoppedNormally);
} catch (CancelException e) {
throw e;
} catch (Exception e) {
logger.warn("CacheClientNotifier: Caught exception attempting to close client: {}",
deadProxy,
e);
}
// Remove the proxy if necessary. It might not be necessary to remove the proxy if it is
// durable.
if (keepProxy) {
logger.info(
"CacheClientNotifier: Keeping proxy for durable client named {} for {} seconds {}.",
deadProxy.getDurableId(), deadProxy.getDurableTimeout(), deadProxy);
} else {
closeAllClientCqs(deadProxy);
if (isDebugEnabled) {
logger.debug("CacheClientNotifier: Not keeping proxy for non-durable client: {}",
deadProxy);
}
removeClientProxy(deadProxy);
}
deadProxy.notifyRemoval();
} // for
}
/**
* Registers a new {@code InterestRegistrationListener} with the set of
* {@code InterestRegistrationListener}s.
*
* @param listener The {@code InterestRegistrationListener} to register
*
* @since GemFire 5.8Beta
*/
public void registerInterestRegistrationListener(InterestRegistrationListener listener) {
writableInterestRegistrationListeners.add(listener);
}
/**
* Unregisters an existing {@code InterestRegistrationListener} from the set of
* {@code InterestRegistrationListener}s.
*
* @param listener The {@code InterestRegistrationListener} to unregister
*
* @since GemFire 5.8Beta
*/
public void unregisterInterestRegistrationListener(InterestRegistrationListener listener) {
writableInterestRegistrationListeners.remove(listener);
}
/**
* Returns a read-only collection of {@code InterestRegistrationListener}s registered with
* this notifier.
*
* @return a read-only collection of {@code InterestRegistrationListener}s registered with
* this notifier
*
* @since GemFire 5.8Beta
*/
public Set<InterestRegistrationListener> getInterestRegistrationListeners() {
return readableInterestRegistrationListeners;
}
/**
* @since GemFire 5.8Beta
*/
boolean containsInterestRegistrationListeners() {
return !writableInterestRegistrationListeners.isEmpty();
}
/**
* @since GemFire 5.8Beta
*/
void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
for (InterestRegistrationListener listener : writableInterestRegistrationListeners) {
if (event.isRegister()) {
listener.afterRegisterInterest(event);
} else {
listener.afterUnregisterInterest(event);
}
}
}
/**
* Test method used to determine the state of the CacheClientNotifier
*
* @return the statistics for the notifier
*/
public CacheClientNotifierStats getStats() {
return statistics;
}
/**
* Returns this {@code CacheClientNotifier}'s {@code InternalCache}.
*/
public InternalCache getCache() {
if (cache != null && cache.isClosed()) {
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
this.cache = cache;
logWriter = cache.getInternalLogWriter();
securityLogWriter = cache.getSecurityInternalLogWriter();
}
}
return cache;
}
/**
* Returns this {@code CacheClientNotifier}'s maximum message count.
*
* @return this {@code CacheClientNotifier}'s maximum message count
*/
protected int getMaximumMessageCount() {
return maximumMessageCount;
}
/**
* Returns this {@code CacheClientNotifier}'s message time-to-live.
*
* @return this {@code CacheClientNotifier}'s message time-to-live
*/
protected int getMessageTimeToLive() {
return messageTimeToLive;
}
protected void handleInterestEvent(InterestRegistrationEvent event) {
InternalRegion region = (InternalRegion) event.getRegion();
region.handleInterestEvent(event);
}
/**
* @param cache The GemFire {@code InternalCache}
* @param listener a listener which should receive notifications abouts queues being added or
*/
private CacheClientNotifier(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
int maximumMessageCount,
int messageTimeToLive,
ConnectionListener listener,
boolean isGatewayReceiver,
CacheClientProxyFactory cacheClientProxyFactory,
SocketMessageWriter socketMessageWriter) {
this.socketMessageWriter = socketMessageWriter;
this.cacheClientProxyFactory = cacheClientProxyFactory;
// Set the Cache
setCache(cache);
this.clientRegistrationEventQueueManager = clientRegistrationEventQueueManager;
this.statisticsClock = statisticsClock;
this.acceptorStats = acceptorStats;
// we only need one thread per client and wait 50ms for close
socketCloser = new SocketCloser(1, 50);
// Set the LogWriter
logWriter = (InternalLogWriter) cache.getLogger();
_connectionListener = listener;
// Set the security LogWriter
securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
this.maximumMessageCount = maximumMessageCount;
this.messageTimeToLive = messageTimeToLive;
// Initialize the statistics
StatisticsFactory factory;
if (isGatewayReceiver) {
factory = new DummyStatisticsFactory();
} else {
factory = getCache().getInternalDistributedSystem().getStatisticsManager();
}
statistics = new CacheClientNotifierStats(factory);
try {
logFrequency = Long.parseLong(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
if (logFrequency <= 0) {
logFrequency = DEFAULT_LOG_FREQUENCY;
}
} catch (Exception e) {
logFrequency = DEFAULT_LOG_FREQUENCY;
}
eventEnqueueWaitTime =
Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
if (eventEnqueueWaitTime < 0) {
eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
}
// Schedule task to periodically ping clients.
scheduleClientPingTask();
}
void deliverInterestChange(ClientProxyMembershipID proxyID,
ClientInterestMessageImpl message) {
DistributionManager dm = ((InternalDistributedSystem) getCache().getDistributedSystem())
.getDistributionManager();
ServerInterestRegistrationMessage.sendInterestChange(dm, proxyID, message);
}
CacheServerStats getAcceptorStats() {
return acceptorStats;
}
SocketCloser getSocketCloser() {
return socketCloser;
}
public void addCompiledQuery(DefaultQuery query) {
if (compiledQueries.putIfAbsent(query.getQueryString(), query) == null) {
// Added successfully.
statistics.incCompiledQueryCount(1);
if (logger.isDebugEnabled()) {
logger.debug(
"Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}",
query.getQueryString(), statistics.getCompiledQueryCount());
}
// Start the clearIdleCompiledQueries thread.
startCompiledQueryCleanupThread();
}
}
public Query getCompiledQuery(String queryString) {
return compiledQueries.get(queryString);
}
private void clearCompiledQueries() {
if (!compiledQueries.isEmpty()) {
statistics.incCompiledQueryCount(-compiledQueries.size());
compiledQueries.clear();
if (logger.isDebugEnabled()) {
logger.debug(
"Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}",
statistics.getCompiledQueryCount());
}
}
}
/**
* This starts the cleanup thread that periodically (DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME)
* checks for the compiled queries that are not used and removes them.
*/
private void startCompiledQueryCleanupThread() {
if (isCompiledQueryCleanupThreadStarted) {
return;
}
SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
@Override
public void run2() {
final boolean isDebugEnabled = logger.isDebugEnabled();
for (Map.Entry<String, DefaultQuery> e : compiledQueries.entrySet()) {
DefaultQuery q = e.getValue();
// Check if the query last used flag.
// If its true set it to false. If its false it means it is not used
// from the its last checked.
if (q.getLastUsed()) {
q.setLastUsed(false);
} else {
if (compiledQueries.remove(e.getKey()) != null) {
// If successfully removed decrement the counter.
statistics.incCompiledQueryCount(-1);
if (isDebugEnabled) {
logger.debug("Removed compiled query from ccn.compliedQueries list. Query: "
+ q.getQueryString() + ". Total compiled queries are : "
+ statistics.getCompiledQueryCount());
}
}
}
}
}
};
synchronized (lockIsCompiledQueryCleanupThreadStarted) {
if (!isCompiledQueryCleanupThreadStarted) {
long period = DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME > 0
? DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME : DefaultQuery.COMPILED_QUERY_CLEAR_TIME;
cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
}
isCompiledQueryCleanupThreadStarted = true;
}
}
void scheduleClientPingTask() {
clientPingTask = new SystemTimer.SystemTimerTask() {
@Override
public void run2() {
// If there are no proxies, return
if (_clientProxies.isEmpty()) {
return;
}
// Create ping message
ClientMessage message = new ClientPingMessageImpl();
// Determine clients to ping
for (CacheClientProxy proxy : getClientProxies()) {
logger.debug("Checking whether to ping {}", proxy);
// Send the ping message directly to the client. Do not qo through
// the queue. If the queue were used, the secondary connection would
// not be pinged. Instead, pings would just build up in secondary
// queue and never be sent. The counter is used to help scalability.
// If normal messages are sent by the proxy, then the counter will
// be reset and no pings will be sent.
if (proxy.incrementAndGetPingCounter() >= CLIENT_PING_TASK_COUNTER) {
logger.debug("Pinging {}", proxy);
proxy.sendMessageDirectly(message);
logger.debug("Done pinging {}", proxy);
} else {
logger.debug("Not pinging because not idle: {}", proxy);
}
}
}
};
if (logger.isDebugEnabled()) {
logger.debug("Scheduling client ping task with period={} ms", CLIENT_PING_TASK_PERIOD);
}
cache.getCCPTimer().scheduleAtFixedRate(clientPingTask,
CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD);
}
/**
* The map of known {@code CacheClientProxy} instances. Maps ClientProxyMembershipID to
* CacheClientProxy. Note that the keys in this map are not updated when a durable client
* reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
* CacheClientProxy and then call getProxyID on it.
*/
private final ConcurrentMap<ClientProxyMembershipID, CacheClientProxy> _clientProxies =
new ConcurrentHashMap<>();
/**
* The map of {@code CacheClientProxy} instances which are getting initialized. Maps
* ClientProxyMembershipID to CacheClientProxy.
*/
private final ConcurrentMap<ClientProxyMembershipID, CacheClientProxy> _initClientProxies =
new ConcurrentHashMap<>();
private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>();
/**
* The GemFire {@code InternalCache}. Note that since this is a singleton class you should
* not use a direct reference to cache in CacheClientNotifier code. Instead, you should always use
* {@code getCache()}
*/
private InternalCache cache;
private InternalLogWriter logWriter;
/**
* The GemFire security {@code LogWriter}
*/
private InternalLogWriter securityLogWriter;
/** the maximum number of messages that can be enqueued in a client-queue. */
private final int maximumMessageCount;
/**
* the time (in seconds) after which a message in the client queue will expire.
*/
private final int messageTimeToLive;
/**
* A listener which receives notifications about queues that are added or removed
*/
private final ConnectionListener _connectionListener;
private final StatisticsClock statisticsClock;
private final CacheServerStats acceptorStats;
/**
* haContainer can hold either the name of the client-messages-region (in case of eviction
* policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
* both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
*/
private volatile HAContainerWrapper haContainer;
/**
* The size of the server-to-client communication socket buffers. This can be modified using the
* BridgeServer.SOCKET_BUFFER_SIZE system property.
*/
private static final int socketBufferSize =
Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768);
/**
* The statistics for this notifier
*/
protected final CacheClientNotifierStats statistics;
/**
* The {@code InterestRegistrationListener} instances registered in this VM. This is used
* when modifying the set of listeners.
*/
private final Set<InterestRegistrationListener> writableInterestRegistrationListeners =
new CopyOnWriteArraySet<>();
/**
* The {@code InterestRegistrationListener} instances registered in this VM. This is used to
* provide a read-only {@code Set} of listeners.
*/
private final Set<InterestRegistrationListener> readableInterestRegistrationListeners =
Collections.unmodifiableSet(writableInterestRegistrationListeners);
/**
* System property name for indicating how much frequently the "Queue full" message should be
* logged.
*/
private static final String MAX_QUEUE_LOG_FREQUENCY =
GeodeGlossary.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
public static final long DEFAULT_LOG_FREQUENCY = 1000;
private static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
GeodeGlossary.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
/**
* System property value denoting the time in milliseconds. Any thread putting an event into a
* subscription queue, which is full, will wait this much time for the queue to make space. It'll
* then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
* #51400.
*/
@MakeNotStatic
public static int eventEnqueueWaitTime;
/**
* The frequency of logging the "Queue full" message.
*/
private long logFrequency;
private final ConcurrentHashMap<String, DefaultQuery> compiledQueries =
new ConcurrentHashMap<>();
private volatile boolean isCompiledQueryCleanupThreadStarted = false;
private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
private SystemTimer.SystemTimerTask clientPingTask;
private final SocketCloser socketCloser;
private static final int CLIENT_PING_TASK_PERIOD =
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
private static final long CLIENT_PING_TASK_COUNTER =
Long.getLong(GeodeGlossary.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
public long getLogFrequency() {
return logFrequency;
}
/** returns the interval between "ping" messages sent to clients on idle connections */
static int getClientPingInterval() {
return CLIENT_PING_TASK_PERIOD;
}
/**
* @return the haContainer
*/
public Map<?, ?> getHaContainer() {
return haContainer;
}
private void initHaContainer(OverflowAttributes overflowAttributes) {
// lazily initialize haContainer in case this CCN instance was created by a gateway receiver
if (overflowAttributes != null
&& !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributes.getEvictionPolicy())) {
haContainer = new HAContainerRegion(cache.getRegion(SEPARATOR
+ CacheServerImpl.clientMessagesRegion(
cache,
overflowAttributes.getEvictionPolicy(),
overflowAttributes.getQueueCapacity(),
overflowAttributes.getPort(),
overflowAttributes.isDiskStore() ? overflowAttributes.getDiskStoreName()
: overflowAttributes.getOverflowDirectory(),
overflowAttributes.isDiskStore())));
} else {
haContainer = new HAContainerMap(new ConcurrentHashMap<>());
}
assert haContainer != null;
if (logger.isDebugEnabled()) {
logger.debug("ha container ({}) has been created.", haContainer.getName());
}
}
private final Set<ClientProxyMembershipID> denyListedClients = new CopyOnWriteArraySet<>();
void addToDenylistedClient(ClientProxyMembershipID proxyID) {
denyListedClients.add(proxyID);
// ensure that cache and distributed system state are current and open
getCache();
new ScheduledThreadPoolExecutor(1).schedule(new ExpireDenyListTask(proxyID), 120,
TimeUnit.SECONDS);
}
Set<ClientProxyMembershipID> getDenylistedClient() {
return denyListedClients;
}
/**
* @param _cache the cache to set
*/
private void setCache(InternalCache _cache) {
cache = _cache;
}
private class ExpireDenyListTask extends PoolTask {
private final ClientProxyMembershipID proxyID;
ExpireDenyListTask(ClientProxyMembershipID proxyID) {
this.proxyID = proxyID;
}
@Override
public void run2() {
if (denyListedClients.remove(proxyID)) {
if (logger.isDebugEnabled()) {
logger.debug("{} client is no longer denylisted", proxyID);
}
}
}
}
@VisibleForTesting
public static CacheClientNotifierProvider singletonProvider() {
return CacheClientNotifier::getInstance;
}
@VisibleForTesting
public static Supplier<CacheClientNotifier> singletonGetter() {
return CacheClientNotifier::getInstance;
}
@FunctionalInterface
@VisibleForTesting
public interface CacheClientNotifierProvider {
CacheClientNotifier get(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
ConnectionListener listener, OverflowAttributes overflowAttributes,
boolean isGatewayReceiver);
}
@VisibleForTesting
public void addClientProxyToMap(CacheClientProxy proxy) {
_clientProxies.put(proxy.getProxyID(), proxy);
}
}