| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.net.Socket; |
| import java.net.SocketAddress; |
| import java.security.Principal; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.Instantiator; |
| import com.gemstone.gemfire.InternalGemFireError; |
| import com.gemstone.gemfire.StatisticsFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheEvent; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.InterestRegistrationEvent; |
| import com.gemstone.gemfire.cache.InterestRegistrationListener; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.RegionExistsException; |
| import com.gemstone.gemfire.cache.UnsupportedVersionException; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask; |
| import com.gemstone.gemfire.cache.query.CqException; |
| import com.gemstone.gemfire.cache.query.Query; |
| import com.gemstone.gemfire.cache.query.internal.DefaultQuery; |
| import com.gemstone.gemfire.cache.query.internal.cq.CqService; |
| import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; |
| import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ; |
| import com.gemstone.gemfire.cache.server.CacheServer; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; |
| import com.gemstone.gemfire.distributed.internal.MessageWithReply; |
| import com.gemstone.gemfire.distributed.internal.ReplyMessage; |
| import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; |
| import com.gemstone.gemfire.internal.ClassLoadUtil; |
| import com.gemstone.gemfire.internal.DummyStatisticsFactory; |
| import com.gemstone.gemfire.internal.InternalDataSerializer; |
| import com.gemstone.gemfire.internal.InternalInstantiator; |
| import com.gemstone.gemfire.internal.SocketUtils; |
| import com.gemstone.gemfire.internal.SystemTimer; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.BridgeObserver; |
| import com.gemstone.gemfire.internal.cache.BridgeObserverHolder; |
| import com.gemstone.gemfire.internal.cache.BridgeRegionEventImpl; |
| import com.gemstone.gemfire.internal.cache.BridgeServerImpl; |
| import com.gemstone.gemfire.internal.cache.CacheClientStatus; |
| import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor; |
| import com.gemstone.gemfire.internal.cache.CachedDeserializable; |
| import com.gemstone.gemfire.internal.cache.Conflatable; |
| import com.gemstone.gemfire.internal.cache.DistributedRegion; |
| import com.gemstone.gemfire.internal.cache.EntryEventImpl; |
| import com.gemstone.gemfire.internal.cache.EnumListenerEvent; |
| import com.gemstone.gemfire.internal.cache.EventID; |
| import com.gemstone.gemfire.internal.cache.FilterProfile; |
| import com.gemstone.gemfire.internal.cache.EntryEventImpl.SerializedCacheValueImpl; |
| import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.InternalCacheEvent; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.RegionEventImpl; |
| import com.gemstone.gemfire.internal.cache.ha.HAContainerMap; |
| import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion; |
| import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper; |
| import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; |
| import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; |
| import com.gemstone.gemfire.internal.cache.tier.Acceptor; |
| import com.gemstone.gemfire.internal.cache.tier.MessageType; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.InternalLogWriter; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.security.AccessControl; |
| import com.gemstone.gemfire.security.AuthenticationFailedException; |
| import com.gemstone.gemfire.security.AuthenticationRequiredException; |
| |
| /** |
| * Class <code>CacheClientNotifier</code> works on the server and manages |
| * client socket connections to clients requesting notification of updates and |
| * notifies them when updates occur. |
| * |
| * @author Barry Oglesby |
| * |
| * @since 3.2 |
| */ |
| @SuppressWarnings({"synthetic-access", "deprecation"}) |
| public class CacheClientNotifier { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static volatile CacheClientNotifier ccnSingleton; |
| |
| /** |
| * Factory method to construct a CacheClientNotifier |
| * <code>CacheClientNotifier</code> instance. |
| * |
| * @param cache |
| * The GemFire <code>Cache</code> |
| * @param acceptorStats |
| * @param maximumMessageCount |
| * @param messageTimeToLive |
| * @param transactionTimeToLive - ttl for txstates for disconnected clients |
| * @param listener |
| * @param overflowAttributesList |
| * @return A <code>CacheClientNotifier</code> instance |
| */ |
| public static synchronized CacheClientNotifier getInstance(Cache cache, |
| CacheServerStats acceptorStats, |
| int maximumMessageCount, int messageTimeToLive, |
| int transactionTimeToLive, |
| ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) |
| { |
| if (ccnSingleton == null) { |
| ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount, |
| messageTimeToLive, transactionTimeToLive, |
| listener, overflowAttributesList, isGatewayReceiver); |
| } |
| // else { |
| // ccnSingleton.acceptorStats = acceptorStats; |
| // ccnSingleton.maximumMessageCount = maximumMessageCount; |
| // ccnSingleton.messageTimeToLive = messageTimeToLive; |
| // ccnSingleton._connectionListener = listener; |
| // ccnSingleton.setCache((GemFireCache)cache); |
| // } |
| return ccnSingleton; |
| } |
| |
| public static CacheClientNotifier getInstance(){ |
| return ccnSingleton; |
| } |
| |
| /** the amount of time in seconds to keep a disconnected client's txstates around */ |
| private final int transactionTimeToLive; |
| |
| /** |
| * Writes a given message to the output stream |
| * |
| * @param dos |
| * the <code>DataOutputStream</code> to use for writing the |
| * message |
| * @param type |
| * a byte representing the message type |
| * @param p_msg |
| * the message to be written; can be null |
| * @param clientVersion |
| * |
| */ |
| private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion ) |
| throws IOException { |
| writeMessage(dos, type, p_msg, clientVersion, (byte)0x00, 0); |
| } |
| |
| private void writeMessage(DataOutputStream dos, byte type, String p_msg, |
| Version clientVersion, byte epType, int qSize) throws IOException { |
| String msg = p_msg; |
| |
| // write the message type |
| dos.writeByte(type); |
| |
| // dummy epType |
| dos.writeByte(epType); |
| // dummy qSize |
| dos.writeInt(qSize); |
| |
| if (msg == null) { |
| msg = ""; |
| } |
| dos.writeUTF(msg); |
| if (clientVersion != null |
| && clientVersion.compareTo(Version.GFE_61) >= 0) { |
| // get all the instantiators. |
| Instantiator[] instantiators = InternalInstantiator.getInstantiators(); |
| HashMap instantiatorMap = new HashMap(); |
| if (instantiators != null && instantiators.length > 0) { |
| for (Instantiator instantiator : instantiators) { |
| ArrayList instantiatorAttributes = new ArrayList(); |
| instantiatorAttributes.add(instantiator.getClass().toString() |
| .substring(6)); |
| instantiatorAttributes.add(instantiator.getInstantiatedClass() |
| .toString().substring(6)); |
| instantiatorMap.put(instantiator.getId(), instantiatorAttributes); |
| } |
| } |
| DataSerializer.writeHashMap(instantiatorMap, dos); |
| |
| // get all the dataserializers. |
| DataSerializer[] dataSerializers = InternalDataSerializer |
| .getSerializers(); |
| HashMap<Integer, ArrayList<String>> dsToSupportedClasses = new HashMap<Integer, ArrayList<String>>(); |
| HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>(); |
| if (dataSerializers != null && dataSerializers.length > 0) { |
| for (DataSerializer dataSerializer : dataSerializers) { |
| dataSerializersMap.put(dataSerializer.getId(), dataSerializer |
| .getClass().toString().substring(6)); |
| if (clientVersion.compareTo(Version.GFE_6516) >= 0) { |
| ArrayList<String> supportedClassNames = new ArrayList<String>(); |
| for (Class clazz : dataSerializer.getSupportedClasses()) { |
| supportedClassNames.add(clazz.getName()); |
| } |
| dsToSupportedClasses.put(dataSerializer.getId(), supportedClassNames); |
| } |
| } |
| } |
| DataSerializer.writeHashMap(dataSerializersMap, dos); |
| if (clientVersion.compareTo(Version.GFE_6516) >= 0) { |
| DataSerializer.writeHashMap(dsToSupportedClasses, dos); |
| } |
| } |
| dos.flush(); |
| } |
| |
| /** |
| * Writes an exception message to the socket |
| * |
| * @param dos |
| * the <code>DataOutputStream</code> to use for writing the |
| * message |
| * @param type |
| * a byte representing the exception type |
| * @param ex |
| * the exception to be written; should not be null |
| * @param clientVersion |
| * |
| */ |
| private void writeException(DataOutputStream dos, byte type, Exception ex, Version clientVersion) |
| throws IOException { |
| |
| writeMessage(dos, type, ex.toString(), clientVersion); |
| } |
| |
| // /** |
| // * Factory method to return the singleton <code>CacheClientNotifier</code> |
| // * instance. |
| // * @return the singleton <code>CacheClientNotifier</code> instance |
| // */ |
| // public static CacheClientNotifier getInstance() |
| // { |
| // return _instance; |
| // } |
| |
| // /** |
| // * Shuts down the singleton <code>CacheClientNotifier</code> instance. |
| // */ |
| // public static void shutdownInstance() |
| // { |
| // if (_instance == null) return; |
| // _instance.shutdown(); |
| // _instance = null; |
| // } |
| |
| /** |
| * Registers a new client updater that wants to receive updates with this |
| * server. |
| * |
| * @param socket |
| * The socket over which the server communicates with the client. |
| */ |
| public void registerClient(Socket socket, boolean isPrimary, long acceptorId, |
| boolean notifyBySubscription) |
| throws IOException |
| { |
| // Since no remote ports were specified in the message, wait for them. |
| long startTime = this._statistics.startTime(); |
| DataInputStream dis = new DataInputStream(SocketUtils.getInputStream(socket));//socket.getInputStream()); |
| DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(socket));//socket.getOutputStream()); |
| |
| // Read the client version |
| short clientVersionOrdinal = Version.readOrdinal(dis); |
| Version clientVersion = null; |
| try { |
| clientVersion = Version.fromOrdinal(clientVersionOrdinal, true); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Registering client with version: {}", this, clientVersion); |
| } |
| } |
| catch (UnsupportedVersionException e) { |
| SocketAddress sa = socket.getRemoteSocketAddress(); |
| UnsupportedVersionException uve = e; |
| if (sa != null) { |
| String sInfo = " Client: " + sa.toString() + "."; |
| uve = new UnsupportedVersionException(e.getMessage() + sInfo); |
| } |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_CAUGHT_EXCEPTION_ATTEMPTING_TO_CLIENT), uve); |
| writeException(dos, Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, uve, clientVersion); |
| return; |
| } |
| |
| // Read and ignore the reply code. This is used on the client to server |
| // handshake. |
| dis.readByte(); // replyCode |
| |
| if (Version.GFE_57.compareTo(clientVersion) <= 0) { |
| registerGFEClient(dis, dos, socket, isPrimary, startTime, clientVersion, |
| acceptorId, notifyBySubscription); |
| } else { |
| Exception e = new UnsupportedVersionException(clientVersionOrdinal); |
| throw new IOException(e.toString()); |
| } |
| } |
| |
| protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, |
| Socket socket, boolean isPrimary, long startTime, Version clientVersion, |
| long acceptorId, boolean notifyBySubscription) throws IOException { |
| // Read the ports and throw them away. We no longer need them |
| int numberOfPorts = dis.readInt(); |
| for (int i = 0; i < numberOfPorts; i++) { |
| dis.readInt(); |
| } |
| // Read the handshake identifier and convert it to a string member id |
| ClientProxyMembershipID proxyID = null; |
| CacheClientProxy proxy; |
| AccessControl authzCallback = null; |
| byte clientConflation = HandShake.CONFLATION_DEFAULT; |
| try { |
| proxyID = ClientProxyMembershipID.readCanonicalized(dis); |
| if (getBlacklistedClient().contains(proxyID)) { |
| writeException(dos, HandShake.REPLY_INVALID, new Exception( |
| "This client is blacklisted by server"), clientVersion); |
| return; |
| } |
| proxy = getClientProxy(proxyID); |
| DistributedMember member = proxyID.getDistributedMember(); |
| |
| DistributedSystem system = this.getCache().getDistributedSystem(); |
| Properties sysProps = system.getProperties(); |
| String authenticator = sysProps |
| .getProperty(DistributionConfig.SECURITY_CLIENT_AUTHENTICATOR_NAME); |
| //TODO;hitesh for conflation |
| if (clientVersion.compareTo(Version.GFE_603) >= 0) { |
| byte[] overrides = HandShake.extractOverrides(new byte[] { (byte) dis.read() }); |
| |
| clientConflation = overrides[0]; |
| |
| } else { |
| clientConflation = (byte) dis.read(); |
| } |
| |
| switch (clientConflation) { |
| case HandShake.CONFLATION_DEFAULT: |
| case HandShake.CONFLATION_OFF: |
| case HandShake.CONFLATION_ON: |
| break; |
| default: |
| writeException(dos, HandShake.REPLY_INVALID, |
| new IllegalArgumentException("Invalid conflation byte"), clientVersion); |
| return; |
| } |
| |
| |
| //TODO:hitesh |
| Properties credentials = HandShake.readCredentials(dis, dos, |
| authenticator, system); |
| if (credentials != null) { |
| if (securityLogWriter.fineEnabled()) { |
| securityLogWriter.fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID); |
| } |
| Principal principal = HandShake.verifyCredentials(authenticator, |
| credentials, system.getSecurityProperties(), this.logWriter, |
| this.securityLogWriter, member); |
| if (securityLogWriter.fineEnabled()) { |
| securityLogWriter.fine("CacheClientNotifier: successfully verified credentials for proxyID: " + proxyID + " having principal: " + principal.getName()); |
| } |
| String postAuthzFactoryName = sysProps |
| .getProperty(DistributionConfig.SECURITY_CLIENT_ACCESSOR_PP_NAME); |
| if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) { |
| if (principal == null) { |
| securityLogWriter.warning(LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1, new Object[] {DistributionConfig.SECURITY_CLIENT_AUTHENTICATOR_NAME, proxyID}); |
| } |
| Method authzMethod = ClassLoadUtil |
| .methodFromName(postAuthzFactoryName); |
| authzCallback = (AccessControl)authzMethod.invoke(null, |
| (Object[])null); |
| authzCallback.init(principal, member, this.getCache()); |
| } |
| } |
| } |
| catch (ClassNotFoundException e) { |
| |
| throw new IOException(LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0.toLocalizedString(e)); |
| } |
| catch (AuthenticationRequiredException ex) { |
| securityLogWriter.warning(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ex}); |
| writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion); |
| return; |
| } |
| catch (AuthenticationFailedException ex) { |
| securityLogWriter.warning(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ex}); |
| writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion); |
| return; |
| } |
| catch (Exception ex) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ""}), ex); |
| writeException(dos, Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, ex, clientVersion); |
| return; |
| } |
| try { |
| proxy = registerClient(socket, proxyID, proxy, isPrimary, clientConflation, |
| clientVersion, acceptorId, notifyBySubscription); |
| } |
| catch (CacheException e) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_0_REGISTERCLIENT_EXCEPTION_ENCOUNTERED_IN_REGISTRATION_1, new Object[] {this, e}), e); |
| IOException io = new IOException(LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_TRYING_TO_REGISTER_INTEREST_DUE_TO_0.toLocalizedString(e.getMessage())); |
| io.initCause(e); |
| throw io; |
| } |
| if (authzCallback != null && proxy != null) { |
| proxy.setPostAuthzCallback(authzCallback); |
| } |
| this._statistics.endClientRegistration(startTime); |
| } |
| |
| |
| /** |
| * Registers a new client that wants to receive updates with this server. |
| * |
| * @param socket |
| * The socket over which the server communicates with the |
| * client. |
| * @param proxyId |
| * The distributed member id of the client being registered |
| * @param proxy |
| * The <code>CacheClientProxy</code> of the given |
| * <code>proxyId</code> |
| * |
| * @return CacheClientProxy for the registered client |
| */ |
| private CacheClientProxy registerClient(Socket socket, |
| ClientProxyMembershipID proxyId, CacheClientProxy proxy, |
| boolean isPrimary, byte clientConflation, Version clientVersion, |
| long acceptorId, boolean notifyBySubscription) throws IOException, CacheException { |
| CacheClientProxy l_proxy = proxy; |
| |
| // Initialize the socket |
| socket.setTcpNoDelay(true); |
| socket.setSendBufferSize(CacheClientNotifier.socketBufferSize); |
| socket.setReceiveBufferSize(CacheClientNotifier.socketBufferSize); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Initialized server-to-client socket with send buffer size: {} bytes and receive buffer size: {} bytes", socket.getSendBufferSize(), socket.getReceiveBufferSize()); |
| } |
| |
| // Determine whether the client is durable or not. |
| byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT; |
| String unsuccessfulMsg = null; |
| boolean successful = true; |
| boolean clientIsDurable = proxyId.isDurable(); |
| if (logger.isDebugEnabled()) { |
| if (clientIsDurable) { |
| logger.debug("CacheClientNotifier: Attempting to register durable client: {}", proxyId.getDurableId()); |
| } else { |
| logger.debug("CacheClientNotifier: Attempting to register non-durable client"); |
| } |
| } |
| |
| byte epType = 0x00; |
| int qSize = 0; |
| if (clientIsDurable) { |
| if (l_proxy == null) { |
| if (isTimedOut(proxyId)) { |
| qSize = PoolImpl.PRIMARY_QUEUE_TIMED_OUT; |
| } else { |
| qSize = PoolImpl.PRIMARY_QUEUE_NOT_AVAILABLE; |
| } |
| // No proxy exists for this durable client. It must be created. |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.", proxyId.getDurableId()); |
| } |
| l_proxy = new CacheClientProxy(this, socket, proxyId, |
| isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); |
| successful = this.initializeProxy(l_proxy); |
| } else { |
| if (proxy.isPrimary()) { |
| epType = (byte) 2; |
| } else { |
| epType = (byte) 1; |
| } |
| qSize = proxy.getQueueSize(); |
| // A proxy exists for this durable client. It must be reinitialized. |
| if (l_proxy.isPaused()) { |
| if (CacheClientProxy.testHook != null) { |
| CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT"); |
| } |
| if (l_proxy.lockDrain()) { |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}", proxyId.getDurableId(), l_proxy); |
| } |
| this._statistics.incDurableReconnectionCount(); |
| l_proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout()); |
| l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, |
| clientConflation, clientVersion); |
| l_proxy.setMarkerEnqueued(true); |
| if (CacheClientProxy.testHook != null) { |
| CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED"); |
| } |
| } |
| finally { |
| l_proxy.unlockDrain(); |
| } |
| } |
| else { |
| unsuccessfulMsg = LocalizedStrings.CacheClientNotifier_COULD_NOT_CONNECT_DUE_TO_CQ_BEING_DRAINED.toLocalizedString(); |
| logger.warn(unsuccessfulMsg); |
| responseByte = HandShake.REPLY_REFUSED; |
| if (CacheClientProxy.testHook != null) { |
| CacheClientProxy.testHook.doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED"); |
| } |
| } |
| } else { |
| // The existing proxy is already running (which means that another |
| // client is already using this durable id. |
| unsuccessfulMsg = LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_THE_REQUESTED_DURABLE_CLIENT_HAS_THE_SAME_IDENTIFIER__0__AS_AN_EXISTING_DURABLE_CLIENT__1__DUPLICATE_DURABLE_CLIENTS_ARE_NOT_ALLOWED.toLocalizedString(new Object[] {proxyId.getDurableId(), proxy}); |
| logger.warn(unsuccessfulMsg); |
| // Set the unsuccessful response byte. |
| responseByte = HandShake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT; |
| } |
| } |
| } else { |
| CacheClientProxy staleClientProxy = this.getClientProxy(proxyId); |
| if (staleClientProxy != null) { |
| // A proxy exists for this non-durable client. It must be closed. |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: A proxy exists for this non-durable client. It must be closed."); |
| } |
| if (staleClientProxy.startRemoval()) { |
| staleClientProxy.waitRemoval(); |
| } |
| else { |
| staleClientProxy.close(false, false); // do not check for queue, just close it |
| removeClientProxy(staleClientProxy); // remove old proxy from proxy set |
| } |
| } // non-null stale proxy |
| |
| // Create the new proxy for this non-durable client |
| l_proxy = new CacheClientProxy(this, socket, proxyId, |
| isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); |
| successful = this.initializeProxy(l_proxy); |
| } |
| |
| if (!successful){ |
| l_proxy = null; |
| responseByte = HandShake.REPLY_REFUSED; |
| unsuccessfulMsg = LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_A_PREVIOUS_CONNECTION_ATTEMPT_FROM_THIS_CLIENT_IS_STILL_BEING_PROCESSED__0.toLocalizedString(new Object[] {proxyId}); |
| } |
| |
| // Tell the client that the proxy has been registered using the response |
| // byte. This byte will be read on the client by the CacheClientUpdater to |
| // determine whether the registration was successful. The times when |
| // registration is unsuccessful currently are if a duplicate durable client |
| // is attempted to be registered or authentication fails. |
| try { |
| DataOutputStream dos = new DataOutputStream(new BufferedOutputStream( |
| SocketUtils.getOutputStream(socket)));//socket.getOutputStream())); |
| // write the message type, message length and the error message (if any) |
| writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize); |
| } |
| catch (IOException ioe) {// remove the added proxy if we get IOException. |
| if (l_proxy != null) { |
| boolean keepProxy = l_proxy.close(false, false); // do not check for queue, just close it |
| if (!keepProxy) { |
| removeClientProxy(l_proxy); |
| } |
| } |
| throw ioe; |
| } |
| |
| if (unsuccessfulMsg != null && logger.isDebugEnabled()) { |
| logger.debug(unsuccessfulMsg); |
| } |
| |
| // If the client is not durable, start its message processor |
| // Starting it here (instead of in the CacheClientProxy constructor) |
| // will ensure that the response byte is sent to the client before |
| // the marker message. If the client is durable, the message processor |
| // is not started until the clientReady message is received. |
| if (!clientIsDurable && l_proxy != null && |
| responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) { |
| // The startOrResumeMessageDispatcher tests if the proxy is a primary. |
| // If this is a secondary proxy, the dispatcher is not started. |
| // The false parameter signifies that a marker message has not already been |
| // processed. This will generate and send one. |
| l_proxy.startOrResumeMessageDispatcher(false); |
| } |
| |
| if (responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Successfully registered {}", l_proxy); |
| } |
| } else { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_UNSUCCESSFULLY_REGISTERED_CLIENT_WITH_IDENTIFIER__0, proxyId)); |
| } |
| return l_proxy; |
| } |
| |
| private boolean initializeProxy(CacheClientProxy l_proxy) throws IOException, CacheException { |
| boolean status = false; |
| if (!this.isProxyInInitializationMode(l_proxy)){ |
| if (logger.isDebugEnabled()) { |
| logger.debug("Initializing proxy: {}", l_proxy); |
| } |
| try { |
| // Add client proxy to initialization list. This has to be done before |
| // the queue is created so that events can be buffered here for delivery |
| // to the queue once it's initialized (bug #41681 and others) |
| addClientInitProxy(l_proxy); |
| l_proxy.initializeMessageDispatcher(); |
| // Initialization success. Add to client proxy list. |
| addClientProxy(l_proxy); |
| return true; |
| } catch (RegionExistsException ree) { |
| if (logger.isDebugEnabled()) { |
| String name = ree.getRegion() != null ? ree.getRegion().getFullPath() : "null region"; |
| logger.debug("Found RegionExistsException while initializing proxy. Region name: {}", name); |
| } |
| // This will return false; |
| } finally { |
| removeClientInitProxy(l_proxy); |
| } |
| } |
| return status; |
| } |
| |
| /** |
| * Makes Primary to this CacheClientProxy and start the dispatcher of the |
| * CacheClientProxy |
| * |
| * @param proxyId |
| * @param isClientReady Whether the marker has already been processed. This |
| * value helps determine whether to start the dispatcher. |
| */ |
| public void makePrimary(ClientProxyMembershipID proxyId, boolean isClientReady) |
| { |
| CacheClientProxy proxy = getClientProxy(proxyId); |
| if (proxy != null) { |
| proxy.setPrimary(true); |
| |
| /* If the client represented by this proxy has: |
| * - already processed the marker message (meaning the client is failing |
| * over to this server as its primary) <or> |
| * - is not durable (meaning the marker message is being processed |
| * automatically |
| * |
| * Then, start or resume the dispatcher. Otherwise, let the clientReady |
| * message start the dispatcher. |
| * See CacheClientProxy.startOrResumeMessageDispatcher |
| if (!proxy._messageDispatcher.isAlive()) { |
| |
| proxy._messageDispatcher._messageQueue.setPrimary(true); |
| proxy._messageDispatcher.start(); |
| } |
| */ |
| if (isClientReady || !proxy.isDurable()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Notifying proxy to start dispatcher for: {}", proxy); |
| } |
| proxy.startOrResumeMessageDispatcher(false); |
| } |
| } else { |
| throw new InternalGemFireError("No cache client proxy on this node for proxyId " + proxyId); |
| } |
| } |
| |
| /** |
| * Adds or updates entry in the dispatched message map when client sends an ack. |
| * |
| * @param proxyId |
| * @param eid |
| * @return success |
| */ |
| public boolean processDispatchedMessage(ClientProxyMembershipID proxyId, EventID eid) |
| { |
| boolean success = false; |
| CacheClientProxy proxy = getClientProxy(proxyId); |
| if (proxy != null) { |
| HARegionQueue harq = proxy.getHARegionQueue(); |
| harq.addDispatchedMessage(new ThreadIdentifier(eid.getMembershipID(), |
| eid.getThreadID()), |
| eid.getSequenceID()); |
| success = true; |
| } |
| return success; |
| } |
| |
| /** |
| * Sets keepalive on the proxy of the given membershipID |
| * |
| * @param membershipID |
| * Uniquely identifies the client pool |
| * @since 5.7 |
| */ |
| public void setKeepAlive(ClientProxyMembershipID membershipID, boolean keepAlive) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: setKeepAlive client: {}", membershipID); |
| } |
| CacheClientProxy proxy = getClientProxy(membershipID); |
| if (proxy != null) { |
| // Close the port if the proxy represents the client and contains the |
| // port) |
| // // If so, remove the port from the client's remote ports |
| // proxy.removePort(clientPort); |
| // Set the keepalive flag |
| proxy.setKeepAlive(keepAlive); |
| } |
| } |
| |
| /** |
| * Unregisters an existing client from this server. |
| * |
| * @param memberId |
| * Uniquely identifies the client |
| * |
| * |
| */ |
| public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId); |
| } |
| CacheClientProxy proxy = getClientProxy(memberId); |
| if (proxy != null) { |
| final boolean isTraceEnabled = logger.isTraceEnabled(); |
| if (isTraceEnabled) { |
| logger.trace("CacheClientNotifier: Potential client: {}", proxy); |
| } |
| |
| // If the proxy's member id is the same as the input member id, add |
| // it to the set of dead proxies. |
| if (!proxy.startRemoval()) { |
| if (isTraceEnabled) { |
| logger.trace("CacheClientNotifier: Potential client: {} matches {}", proxy, memberId); |
| } |
| closeDeadProxies(Collections.singletonList(proxy), normalShutdown); |
| } |
| } |
| } |
| |
| /** |
| * The client represented by the proxyId is ready to receive updates. |
| * |
| * @param proxyId |
| */ |
| public void readyForEvents(ClientProxyMembershipID proxyId) |
| { |
| CacheClientProxy proxy = getClientProxy(proxyId); |
| if (proxy == null) { |
| //@todo log a message |
| } else { |
| // False signifies that a marker message has not already been processed. |
| // Generate and send one. |
| proxy.startOrResumeMessageDispatcher(false); |
| } |
| } |
| |
| private ClientUpdateMessageImpl constructClientMessage(InternalCacheEvent event){ |
| ClientUpdateMessageImpl clientMessage = null; |
| EnumListenerEvent operation = event.getEventType(); |
| |
| try { |
| clientMessage = initializeMessage(operation, event); |
| } catch (Exception e) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CANNOT_NOTIFY_CLIENTS_TO_PERFORM_OPERATION_0_ON_EVENT_1, new Object[] { operation, event}), e); |
| } |
| return clientMessage; |
| } |
| |
| /** |
| * notify interested clients of the given cache event. The |
| * event should have routing information in it that determines |
| * which clients will receive the event. |
| */ |
| public static void notifyClients(InternalCacheEvent event) { |
| CacheClientNotifier instance = ccnSingleton; |
| if (instance != null) { |
| instance.singletonNotifyClients(event, null); |
| |
| } |
| } |
| |
| /** |
| * notify interested clients of the given cache event using the |
| * given update message. The |
| * event should have routing information in it that determines |
| * which clients will receive the event. |
| */ |
| public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) { |
| CacheClientNotifier instance = ccnSingleton; |
| if (instance != null) { |
| instance.singletonNotifyClients(event, cmsg); |
| |
| } |
| } |
| |
| private void singletonNotifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg){ |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| final boolean isTraceEnabled = logger.isTraceEnabled(); |
| |
| FilterInfo filterInfo = event.getLocalFilterInfo(); |
| |
| // if (_logger.fineEnabled()) { |
| // _logger.fine("Client dispatcher processing event " + event); |
| // } |
| |
| FilterProfile regionProfile = ((LocalRegion)event.getRegion()).getFilterProfile(); |
| if (filterInfo != null) { |
| // if the routing was made using an old profile we need to recompute it |
| if (isTraceEnabled) { |
| logger.trace("Event isOriginRemote={}", event.isOriginRemote()); |
| } |
| } |
| |
| if ((filterInfo == null || |
| (filterInfo.getCQs() == null && |
| filterInfo.getInterestedClients() == null && |
| filterInfo.getInterestedClientsInv() == null))) { |
| return; |
| } |
| |
| long startTime = this._statistics.startTime(); |
| |
| ClientUpdateMessageImpl clientMessage; |
| if (cmsg == null) { |
| clientMessage = constructClientMessage(event); |
| } else { |
| clientMessage = (ClientUpdateMessageImpl)cmsg; |
| } |
| if (clientMessage == null){ |
| return; |
| } |
| |
| // Holds the clientIds to which filter message needs to be sent. |
| Set<ClientProxyMembershipID> filterClients = new HashSet(); |
| |
| // Add CQ info. |
| if (filterInfo.getCQs() != null) { |
| for (Map.Entry<Long, Integer> e: filterInfo.getCQs().entrySet()) { |
| Long cqID = e.getKey(); |
| String cqName = regionProfile.getRealCqID(cqID); |
| if (cqName == null){ |
| continue; |
| } |
| ServerCQ cq = regionProfile.getCq(cqName); |
| if (cq != null){ |
| ClientProxyMembershipID id = cq.getClientProxyId(); |
| filterClients.add(id); |
| if (isDebugEnabled) { |
| logger.debug("Adding cq routing info to message for id: {} and cq: {}", id, cqName); |
| } |
| |
| clientMessage.addClientCq(id, cq.getName(), e.getValue()); |
| } |
| } |
| } |
| |
| // Add interestList info. |
| if (filterInfo.getInterestedClientsInv() != null) { |
| Set<Object>rawIDs = regionProfile.getRealClientIDs(filterInfo.getInterestedClientsInv()); |
| Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs, true); |
| if (ids.remove(event.getContext())) { // don't send to member of origin |
| CacheClientProxy ccp = getClientProxy(event.getContext()); |
| if (ccp != null) { |
| ccp.getStatistics().incMessagesNotQueuedOriginator(); |
| } |
| } |
| if (!ids.isEmpty()) { |
| if (isTraceEnabled) { |
| logger.trace("adding invalidation routing to message for {}" + ids); |
| } |
| clientMessage.addClientInterestList(ids, false); |
| filterClients.addAll(ids); |
| } |
| } |
| if (filterInfo.getInterestedClients() != null) { |
| Set<Object>rawIDs = regionProfile.getRealClientIDs(filterInfo.getInterestedClients()); |
| Set<ClientProxyMembershipID> ids = getProxyIDs(rawIDs, true); |
| if (ids.remove(event.getContext())) { // don't send to member of origin |
| CacheClientProxy ccp = getClientProxy(event.getContext()); |
| if (ccp != null) { |
| ccp.getStatistics().incMessagesNotQueuedOriginator(); |
| } |
| } |
| if (!ids.isEmpty()) { |
| if (isTraceEnabled) { |
| logger.trace("adding routing to message for {}", ids); |
| } |
| clientMessage.addClientInterestList(ids, true); |
| filterClients.addAll(ids); |
| } |
| } |
| |
| Conflatable conflatable = null; |
| |
| if (clientMessage instanceof ClientTombstoneMessage) { |
| // bug #46832 - HAEventWrapper deserialization can't handle subclasses |
| // of ClientUpdateMessageImpl, so don't wrap them |
| conflatable = clientMessage; |
| // Remove clients older than 70 from the filterClients if the message is |
| // ClientTombstoneMessage. Fix for #46591. |
| Object[] objects = filterClients.toArray(); |
| for (Object id : objects) { |
| CacheClientProxy ccp = getClientProxy((ClientProxyMembershipID)id, true); |
| if (ccp != null && ccp.getVersion().compareTo(Version.GFE_70) < 0) { |
| filterClients.remove(id); |
| } |
| } |
| } else { |
| HAEventWrapper wrapper = new HAEventWrapper(clientMessage); |
| // Set the putInProgress flag to true before starting the put on proxy's |
| // HA queues. Nowhere else, this flag is being set to true. |
| wrapper.setPutInProgress(true); |
| conflatable = wrapper; |
| } |
| |
| singletonRouteClientMessage(conflatable, filterClients); |
| |
| this._statistics.endEvent(startTime); |
| |
| // Cleanup destroyed events in CQ result cache. |
| // While maintaining the CQ results key caching. the destroy event |
| // keys are marked as destroyed instead of removing them, this is |
| // to take care, arrival of duplicate events. The key marked as |
| // destroyed are removed after the event is placed in clients HAQueue. |
| if (filterInfo.filterProcessedLocally){ |
| removeDestroyTokensFromCqResultKeys(event, filterInfo); |
| } |
| |
| } |
| |
| |
| private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event, FilterInfo filterInfo){ |
| FilterProfile regionProfile = ((LocalRegion)event.getRegion()).getFilterProfile(); |
| if (event.getOperation().isEntry() && filterInfo.getCQs() != null) { |
| EntryEventImpl entryEvent = (EntryEventImpl)event; |
| for (Map.Entry<Long, Integer> e: filterInfo.getCQs().entrySet()) { |
| Long cqID = e.getKey(); |
| String cqName = regionProfile.getRealCqID(cqID); |
| if (cqName != null) { |
| ServerCQ cq = regionProfile.getCq(cqName); |
| if (cq != null |
| && e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY))) { |
| cq.removeFromCqResultKeys(entryEvent.getKey(), true); |
| } |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * delivers the given message to all proxies for routing. The message should |
| * already have client interest established, or override the isClientInterested |
| * method to implement its own routing |
| * @param clientMessage |
| */ |
| public static void routeClientMessage(Conflatable clientMessage) { |
| CacheClientNotifier instance = ccnSingleton; |
| if (instance != null) { |
| instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok to use keySet here because all we do is call getClientProxy with these keys |
| } |
| } |
| |
| /* |
| * this is for server side registration of client queue |
| */ |
| public static void routeSingleClientMessage(ClientUpdateMessage clientMessage, ClientProxyMembershipID clientProxyMembershipId) { |
| CacheClientNotifier instance = ccnSingleton; |
| if (instance != null) { |
| instance.singletonRouteClientMessage(clientMessage, Collections.singleton(clientProxyMembershipId)); |
| } |
| } |
| |
| private void singletonRouteClientMessage(Conflatable conflatable, |
| Collection<ClientProxyMembershipID> filterClients) { |
| |
| this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified but no p2p distribution |
| |
| List<CacheClientProxy> deadProxies = null; |
| for(ClientProxyMembershipID clientId: filterClients ) { |
| CacheClientProxy proxy; |
| proxy = this.getClientProxy(clientId, true); |
| if (proxy != null) { |
| if (proxy.isAlive() || proxy.isPaused() || proxy.isConnected() || proxy.isDurable()) { |
| proxy.deliverMessage(conflatable); |
| } else { |
| proxy.getStatistics().incMessagesFailedQueued(); |
| if (deadProxies == null) { |
| deadProxies = new ArrayList<CacheClientProxy>(); |
| } |
| deadProxies.add(proxy); |
| } |
| this.blackListSlowReciever(proxy); |
| } |
| } |
| checkAndRemoveFromClientMsgsRegion(conflatable); |
| // Remove any dead clients from the clients to notify |
| if (deadProxies != null) { |
| closeDeadProxies(deadProxies, false); |
| } |
| |
| } |
| |
| /** |
| * processes the given collection of durable and non-durable client identifiers, |
| * returning a collection of non-durable identifiers of clients connected to this VM |
| */ |
| public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) { |
| return getProxyIDs(mixedDurableAndNonDurableIDs, false); |
| } |
| |
| /** |
| * processes the given collection of durable and non-durable client identifiers, |
| * returning a collection of non-durable identifiers of clients connected to this VM. |
| * This version can check for proxies in initialization as well as fully initialized |
| * proxies. |
| */ |
| public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs, |
| boolean proxyInInitMode) { |
| Set<ClientProxyMembershipID> result = new HashSet(); |
| for (Object id: mixedDurableAndNonDurableIDs) { |
| if (id instanceof String) { |
| CacheClientProxy clientProxy = getClientProxy((String)id, true); |
| if (clientProxy != null) { |
| result.add(clientProxy.getProxyID()); |
| } |
| // else { we don't have a proxy for the given durable ID } |
| } else { |
| // try to canonicalize the ID. |
| CacheClientProxy proxy = getClientProxy((ClientProxyMembershipID)id, true); |
| if (proxy != null) { |
| //this._logger.info(LocalizedStrings.DEBUG, "BRUCE: found match for " + id + ": " + proxy.getProxyID()); |
| result.add(proxy.getProxyID()); |
| } else { |
| //this._logger.info(LocalizedStrings.DEBUG, "BRUCE: did not find match for " + id); |
| // this was causing OOMEs in HARegion initial image processing because |
| // messages had routing for clients unknown to this server |
| //result.add((ClientProxyMembershipID)id); |
| } |
| } |
| } |
| return result; |
| } |
| |
| private void blackListSlowReciever(CacheClientProxy clientProxy){ |
| final CacheClientProxy proxy = clientProxy; |
| if ((proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever()) |
| && !blackListedClients.contains(proxy.getProxyID())) { |
| // log alert with client info. |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CLIENT_0_IS_A_SLOW_RECEIVER, new Object[] { proxy.getProxyID() })); |
| addToBlacklistedClient(proxy.getProxyID()); |
| InternalDistributedSystem ids = (InternalDistributedSystem)this.getCache() |
| .getDistributedSystem(); |
| final DM dm = ids.getDistributionManager(); |
| dm.getWaitingThreadPool().execute(new Runnable() { |
| public void run() { |
| |
| CacheDistributionAdvisor advisor = ((DistributedRegion)proxy |
| .getHARegionQueue().getRegion()).getCacheDistributionAdvisor(); |
| Set members = advisor.adviseCacheOp(); |
| |
| // Send client blacklist message |
| ClientBlacklistProcessor.sendBlacklistedClient(proxy.getProxyID(), |
| dm, members); |
| |
| // close the proxy for slow receiver. |
| proxy.close(false, false); |
| removeClientProxy(proxy); |
| |
| if (PoolImpl.AFTER_QUEUE_DESTROY_MESSAGE_FLAG) { |
| BridgeObserver bo = BridgeObserverHolder.getInstance(); |
| bo.afterQueueDestroyMessage(); |
| } |
| |
| // send remove from blacklist. |
| RemoveClientFromBlacklistMessage rcm = new RemoveClientFromBlacklistMessage(); |
| rcm.setProxyID(proxy.getProxyID()); |
| dm.putOutgoing(rcm); |
| blackListedClients.remove(proxy.getProxyID()); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Initializes a <code>ClientUpdateMessage</code> from an operation and |
| * event |
| * |
| * @param operation |
| * The operation that occurred (e.g. AFTER_CREATE) |
| * @param event |
| * The event containing the data to be updated |
| * @return a <code>ClientUpdateMessage</code> |
| * @throws Exception |
| */ |
| private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, |
| CacheEvent event) throws Exception |
| { |
| if (!supportsOperation(operation)) { |
| throw new Exception(LocalizedStrings.CacheClientNotifier_THE_CACHE_CLIENT_NOTIFIER_DOES_NOT_SUPPORT_OPERATIONS_OF_TYPE_0.toLocalizedString(operation)); |
| } |
| // String regionName = event.getRegion().getFullPath(); |
| Object keyOfInterest = null; |
| final EventID eventIdentifier; |
| ClientProxyMembershipID membershipID = null; |
| boolean isNetLoad = false; |
| Object callbackArgument = null; |
| byte[] delta = null; |
| VersionTag versionTag = null; |
| |
| if (event.getOperation().isEntry()) { |
| EntryEventImpl entryEvent = (EntryEventImpl)event; |
| versionTag = entryEvent.getVersionTag(); |
| delta = entryEvent.getDeltaBytes(); |
| callbackArgument = entryEvent.getRawCallbackArgument(); |
| if (entryEvent.isBridgeEvent()) { |
| membershipID = entryEvent.getContext(); |
| } |
| keyOfInterest = entryEvent.getKey(); |
| eventIdentifier = entryEvent.getEventId(); |
| isNetLoad = entryEvent.isNetLoad(); |
| } |
| else { |
| RegionEventImpl regionEvent = (RegionEventImpl)event; |
| callbackArgument = regionEvent.getRawCallbackArgument(); |
| eventIdentifier = regionEvent.getEventId(); |
| if (event instanceof BridgeRegionEventImpl) { |
| BridgeRegionEventImpl bridgeEvent = (BridgeRegionEventImpl)event; |
| membershipID = bridgeEvent.getContext(); |
| } |
| } |
| |
| // NOTE: If delta is non-null, value MUST be in Object form of type Delta. |
| ClientUpdateMessageImpl clientUpdateMsg = new ClientUpdateMessageImpl(operation, |
| (LocalRegion)event.getRegion(), keyOfInterest, null, delta, (byte) 0x01, |
| callbackArgument, membershipID, eventIdentifier, versionTag); |
| |
| if (event.getOperation().isEntry()) { |
| EntryEventImpl entryEvent = (EntryEventImpl)event; |
| // only need a value if notifyBySubscription is true |
| entryEvent.exportNewValue(clientUpdateMsg); |
| } |
| |
| if (isNetLoad) { |
| clientUpdateMsg.setIsNetLoad(isNetLoad); |
| } |
| |
| return clientUpdateMsg; |
| } |
| |
| /** |
| * Returns whether the <code>CacheClientNotifier</code> supports the input |
| * operation. |
| * |
| * @param operation |
| * The operation that occurred (e.g. AFTER_CREATE) |
| * @return whether the <code>CacheClientNotifier</code> supports the input |
| * operation |
| */ |
| protected boolean supportsOperation(EnumListenerEvent operation) |
| { |
| return operation == EnumListenerEvent.AFTER_CREATE |
| || operation == EnumListenerEvent.AFTER_UPDATE |
| || operation == EnumListenerEvent.AFTER_DESTROY |
| || operation == EnumListenerEvent.AFTER_INVALIDATE |
| || operation == EnumListenerEvent.AFTER_REGION_DESTROY |
| || operation == EnumListenerEvent.AFTER_REGION_CLEAR |
| || operation == EnumListenerEvent.AFTER_REGION_INVALIDATE; |
| } |
| |
| // /** |
| // * Queues the <code>ClientUpdateMessage</code> to be distributed |
| // * to interested clients. This method is not being used currently. |
| // * @param clientMessage The <code>ClientUpdateMessage</code> to be queued |
| // */ |
| // protected void notifyClients(final ClientUpdateMessage clientMessage) |
| // { |
| // if (USE_SYNCHRONOUS_NOTIFICATION) |
| // { |
| // // Execute the method in the same thread as the caller |
| // deliver(clientMessage); |
| // } |
| // else { |
| // // Obtain an Executor and use it to execute the method in its own thread |
| // try |
| // { |
| // getExecutor().execute(new Runnable() |
| // { |
| // public void run() |
| // { |
| // deliver(clientMessage); |
| // } |
| // } |
| // ); |
| // } catch (InterruptedException e) |
| // { |
| // _logger.warning("CacheClientNotifier: notifyClients interrupted", e); |
| // Thread.currentThread().interrupt(); |
| // } |
| // } |
| // } |
| |
| // /** |
| // * Updates the information this <code>CacheClientNotifier</code> maintains |
| // * for a given edge client. It is invoked when a edge client re-connects to |
| // * the server. |
| // * |
| // * @param clientHost |
| // * The host on which the client runs (i.e. the host the |
| // * CacheClientNotifier uses to communicate with the |
| // * CacheClientUpdater) This is used with the clientPort to uniquely |
| // * identify the client |
| // * @param clientPort |
| // * The port through which the server communicates with the client |
| // * (i.e. the port the CacheClientNotifier uses to communicate with |
| // * the CacheClientUpdater) This is used with the clientHost to |
| // * uniquely identify the client |
| // * @param remotePort |
| // * The port through which the client communicates with the server |
| // * (i.e. the new port the ConnectionImpl uses to communicate with the |
| // * ServerConnection) |
| // * @param membershipID |
| // * Uniquely idenifies the client |
| // */ |
| // public void registerClientPort(String clientHost, int clientPort, |
| // int remotePort, ClientProxyMembershipID membershipID) |
| // { |
| // if (_logger.fineEnabled()) |
| // _logger.fine("CacheClientNotifier: Registering client port: " |
| // + clientHost + ":" + clientPort + " with remote port " + remotePort |
| // + " and ID " + membershipID); |
| // for (Iterator i = getClientProxies().iterator(); i.hasNext();) { |
| // CacheClientProxy proxy = (CacheClientProxy)i.next(); |
| // if (_logger.finerEnabled()) |
| // _logger.finer("CacheClientNotifier: Potential client: " + proxy); |
| // //if (proxy.representsCacheClientUpdater(clientHost, clientPort)) |
| // if (proxy.isMember(membershipID)) { |
| // if (_logger.finerEnabled()) |
| // _logger |
| // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match"); |
| // proxy.addPort(remotePort); |
| // } |
| // else { |
| // if (_logger.finerEnabled()) |
| // _logger.finer("CacheClientNotifier: Host and port " |
| // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort() |
| // + " do not match " + clientHost + ":" + clientPort); |
| // } |
| // } |
| // } |
| |
| /** |
| * Registers client interest in the input region and key. |
| * |
| * @param regionName |
| * The name of the region of interest |
| * @param keyOfInterest |
| * The name of the key of interest |
| * @param membershipID clients ID |
| * @param interestType type of registration |
| * @param isDurable whether the registration persists when client goes away |
| * @param sendUpdatesAsInvalidates client wants invalidation messages |
| * @param manageEmptyRegions whether to book keep empty region information |
| * @param regionDataPolicy (0=empty) |
| */ |
| public void registerClientInterest(String regionName, Object keyOfInterest, |
| ClientProxyMembershipID membershipID, int interestType, boolean isDurable, |
| boolean sendUpdatesAsInvalidates, |
| boolean manageEmptyRegions, int regionDataPolicy, |
| boolean flushState) throws IOException, RegionDestroyedException |
| { |
| |
| CacheClientProxy proxy = getClientProxy(membershipID, true); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Client {} registering interest in: {} -> {} (an instance of {})", proxy, regionName, keyOfInterest, keyOfInterest.getClass().getName()); |
| } |
| |
| if(proxy == null){ |
| // client should see this and initiates failover |
| throw new IOException(LocalizedStrings.CacheClientNotifier_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL.toLocalizedString()); |
| } |
| |
| boolean done = false; |
| try { |
| proxy.registerClientInterest(regionName, keyOfInterest, interestType, |
| isDurable, sendUpdatesAsInvalidates, flushState); |
| |
| if (manageEmptyRegions) { |
| updateMapOfEmptyRegions(proxy.getRegionsWithEmptyDataPolicy(), |
| regionName, regionDataPolicy); |
| } |
| |
| done = true; |
| } |
| finally { |
| if (!done) { |
| proxy.unregisterClientInterest(regionName, keyOfInterest, interestType, |
| false); |
| } |
| } |
| } |
| |
| /* |
| protected void addFilterRegisteredClients(String regionName, |
| ClientProxyMembershipID membershipID) throws RegionNotFoundException { |
| // Update Regions book keeping. |
| LocalRegion region = (LocalRegion)this._cache.getRegion(regionName); |
| if (region == null) { |
| //throw new AssertionError("Could not find region named '" + regionName + "'"); |
| // @todo: see bug 36805 |
| // fix for bug 37979 |
| if (_logger.fineEnabled()) { |
| _logger |
| .fine("CacheClientNotifier: Client " + membershipID |
| + " :Throwing RegionDestroyedException as region: " + regionName + " is not present."); |
| } |
| throw new RegionDestroyedException("registerInterest failed", regionName); |
| } |
| else { |
| region.getFilterProfile().addFilterRegisteredClients(this, membershipID); |
| } |
| } |
| */ |
| |
| /** |
| * Store region and delta relation |
| * |
| * @param regionsWithEmptyDataPolicy |
| * @param regionName |
| * @param regionDataPolicy (0==empty) |
| * @since 6.1 |
| */ |
| public void updateMapOfEmptyRegions(Map regionsWithEmptyDataPolicy, |
| String regionName, int regionDataPolicy) { |
| if (regionDataPolicy == 0) { |
| if (!regionsWithEmptyDataPolicy.containsKey(regionName)) { |
| regionsWithEmptyDataPolicy.put(regionName, Integer.valueOf(0)); |
| } |
| } |
| } |
| |
| /** |
| * Unregisters client interest in the input region and key. |
| * |
| * @param regionName |
| * The name of the region of interest |
| * @param keyOfInterest |
| * The name of the key of interest |
| * @param isClosing |
| * Whether the caller is closing |
| * @param membershipID |
| * The <code>ClientProxyMembershipID</code> of the client no longer |
| * interested in this <code>Region</code> and key |
| */ |
| public void unregisterClientInterest(String regionName, Object keyOfInterest, |
| int interestType, boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Client {} unregistering interest in: {} -> {} (an instance of {})", membershipID, regionName, keyOfInterest, keyOfInterest.getClass().getName()); |
| } |
| CacheClientProxy proxy = getClientProxy(membershipID); |
| if(proxy != null) { |
| proxy.setKeepAlive(keepalive); |
| proxy.unregisterClientInterest(regionName, keyOfInterest, interestType, isClosing); |
| } |
| } |
| |
| /** |
| * Registers client interest in the input region and list of keys. |
| * |
| * @param regionName |
| * The name of the region of interest |
| * @param keysOfInterest |
| * The list of keys of interest |
| * @param membershipID |
| * The <code>ClientProxyMembershipID</code> of the client no longer |
| * interested in this <code>Region</code> and key |
| */ |
| public void registerClientInterest(String regionName, List keysOfInterest, |
| ClientProxyMembershipID membershipID, boolean isDurable, |
| boolean sendUpdatesAsInvalidates, |
| boolean manageEmptyRegions, int regionDataPolicy, boolean flushState) throws IOException, RegionDestroyedException |
| { |
| CacheClientProxy proxy = getClientProxy(membershipID, true); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Client {} registering interest in: {} -> {}", proxy, regionName, keysOfInterest); |
| } |
| |
| if(proxy == null){ |
| throw new IOException(LocalizedStrings.CacheClientNotifier_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL.toLocalizedString()); |
| } |
| |
| proxy.registerClientInterestList(regionName, keysOfInterest, isDurable, |
| sendUpdatesAsInvalidates, flushState); |
| |
| if (manageEmptyRegions) { |
| updateMapOfEmptyRegions(proxy.getRegionsWithEmptyDataPolicy(), regionName, |
| regionDataPolicy); |
| } |
| } |
| |
| /** |
| * Unregisters client interest in the input region and list of keys. |
| * |
| * @param regionName |
| * The name of the region of interest |
| * @param keysOfInterest |
| * The list of keys of interest |
| * @param isClosing |
| * Whether the caller is closing |
| * @param membershipID |
| * The <code>ClientProxyMembershipID</code> of the client no longer |
| * interested in this <code>Region</code> and key |
| */ |
| public void unregisterClientInterest(String regionName, List keysOfInterest, |
| boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier: Client {} unregistering interest in: {} -> {}", membershipID, regionName, keysOfInterest); |
| } |
| CacheClientProxy proxy = getClientProxy(membershipID); |
| if (proxy != null) { |
| proxy.setKeepAlive(keepalive); |
| proxy.unregisterClientInterest(regionName, keysOfInterest, isClosing); |
| } |
| } |
| |
| |
| /** |
| * If the conflatable is an instance of HAEventWrapper, and if the |
| * corresponding entry is present in the haContainer, set the |
| * reference to the clientUpdateMessage to null and putInProgress flag to |
| * false. Also, if the ref count is zero, then remove the entry from the |
| * haContainer. |
| * |
| * @param conflatable |
| * @since 5.7 |
| */ |
| private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) { |
| if (conflatable instanceof HAEventWrapper) { |
| HAEventWrapper wrapper = (HAEventWrapper)conflatable; |
| if (!wrapper.getIsRefFromHAContainer()) { |
| wrapper = (HAEventWrapper)haContainer.getKey(wrapper); |
| if (wrapper != null && !wrapper.getPutInProgress()) { |
| synchronized (haContainer) { |
| if (wrapper.getReferenceCount() == 0L) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Removing event from haContainer: {}", wrapper); |
| } |
| haContainer.remove(wrapper); |
| } |
| } |
| } |
| //else { |
| // This is a replay-of-event case. |
| //} |
| } |
| else { |
| // This wrapper resides in haContainer. |
| wrapper.setClientUpdateMessage(null); |
| wrapper.setPutInProgress(false); |
| synchronized (haContainer) { |
| if (wrapper.getReferenceCount() == 0L) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Removing event from haContainer: {}", wrapper); |
| } |
| haContainer.remove(wrapper); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the <code>CacheClientProxy</code> associated to the membershipID * |
| * |
| * @return the <code>CacheClientProxy</code> associated to the membershipID |
| */ |
| public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) |
| { |
| return (CacheClientProxy)this._clientProxies.get(membershipID); |
| } |
| |
| /** |
| * Returns the CacheClientProxy associated to the membershipID. This |
| * looks at both proxies that are initialized and those that are still |
| * in initialization mode. |
| */ |
| public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID, |
| boolean proxyInInitMode) |
| { |
| CacheClientProxy proxy = getClientProxy(membershipID); |
| if (proxyInInitMode && proxy == null) { |
| proxy = (CacheClientProxy)this._initClientProxies.get(membershipID); |
| } |
| return proxy; |
| } |
| |
| |
| /** |
| * Returns the <code>CacheClientProxy</code> associated to the |
| * durableClientId |
| * |
| * @return the <code>CacheClientProxy</code> associated to the |
| * durableClientId |
| */ |
| public CacheClientProxy getClientProxy(String durableClientId) { |
| return getClientProxy(durableClientId, false); |
| } |
| |
| /** |
| * Returns the <code>CacheClientProxy</code> associated to the |
| * durableClientId. This version of the method can check for initializing |
| * proxies as well as fully initialized proxies. |
| * |
| * @return the <code>CacheClientProxy</code> associated to the |
| * durableClientId |
| */ |
| public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| final boolean isTraceEnabled = logger.isTraceEnabled(); |
| |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: Determining client for {}", durableClientId); |
| } |
| CacheClientProxy proxy = null; |
| for (Iterator i = getClientProxies().iterator(); i.hasNext();) { |
| CacheClientProxy clientProxy = (CacheClientProxy)i.next(); |
| if (isTraceEnabled) { |
| logger.trace("CacheClientNotifier: Checking client {}", clientProxy); |
| } |
| if (clientProxy.getDurableId().equals(durableClientId)) { |
| proxy = clientProxy; |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: {} represents the durable client {}", proxy, durableClientId); |
| } |
| break; |
| } |
| } |
| if (proxy == null && proxyInInitMode) { |
| for (Iterator i = this._initClientProxies.values().iterator(); i.hasNext();) { |
| CacheClientProxy clientProxy = (CacheClientProxy)i.next(); |
| if (isTraceEnabled) { |
| logger.trace("CacheClientNotifier: Checking initializing client {}", clientProxy); |
| } |
| if (clientProxy.getDurableId().equals(durableClientId)) { |
| proxy = clientProxy; |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: initializing client {} represents the durable client {}", proxy, durableClientId); |
| } |
| break; |
| } |
| } |
| } |
| return proxy; |
| } |
| |
| /** |
| * Returns the <code>CacheClientProxySameDS</code> associated to the |
| * membershipID * |
| * @return the <code>CacheClientProxy</code> associated to the same |
| * distributed system |
| */ |
| public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this, membershipID); |
| logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}", this, getClientProxies().size()); |
| /* _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " |
| + getClientProxies());*/ |
| } |
| CacheClientProxy proxy = null; |
| for (Iterator i = getClientProxies().iterator(); i.hasNext();) { |
| CacheClientProxy clientProxy = (CacheClientProxy) i.next(); |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: Checking client {}", clientProxy); |
| } |
| if (clientProxy.isSameDSMember(membershipID)) { |
| proxy = clientProxy ; |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy, membershipID); |
| } |
| break; |
| } |
| } |
| return proxy; |
| } |
| |
| |
| /** |
| * It will remove the clients connected to the passed acceptorId. |
| * If its the only server, shuts down this instance. |
| */ |
| protected synchronized void shutdown(long acceptorId) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}", this.getCache().getCacheServers().size()); |
| } |
| |
| Iterator it = this._clientProxies.values().iterator(); |
| // Close all the client proxies |
| while (it.hasNext()) { |
| CacheClientProxy proxy = (CacheClientProxy)it.next(); |
| if (proxy.getAcceptorId() != acceptorId){ |
| continue; |
| } |
| it.remove(); |
| try { |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: Closing {}", proxy); |
| } |
| proxy.terminateDispatching(true); |
| } |
| catch (Exception ignore) { |
| if (isDebugEnabled) { |
| logger.debug("{}: Exception in closing down the CacheClientProxy", this, ignore); |
| } |
| } |
| } |
| |
| if (noActiveServer() && ccnSingleton != null){ |
| ccnSingleton = null; |
| haContainer.cleanUp(); |
| if (isDebugEnabled) { |
| logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName()); |
| } |
| this.clearCompiledQueries(); |
| blackListedClients.clear(); |
| |
| // cancel the ping task |
| this.clientPingTask.cancel(); |
| |
| // Close the statistics |
| this._statistics.close(); |
| } |
| } |
| |
| private boolean noActiveServer(){ |
| for (CacheServer server: this.getCache().getCacheServers()){ |
| if (server.isRunning()){ |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Adds a new <code>CacheClientProxy</code> to the list of known client |
| * proxies |
| * |
| * @param proxy |
| * The <code>CacheClientProxy</code> to add |
| */ |
| protected void addClientProxy(CacheClientProxy proxy) throws IOException |
| { |
| // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy); |
| getCache(); // ensure cache reference is up to date so firstclient state is correct |
| this._clientProxies.put(proxy.getProxyID(), proxy); |
| // Remove this proxy from the init proxy list. |
| removeClientInitProxy(proxy); |
| this._connectionListener.queueAdded(proxy.getProxyID()); |
| if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { |
| // Delta not supported with conflation ON |
| ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); |
| /* |
| * #41788 - If the client connection init starts while cache/member is |
| * shutting down, ClientHealthMonitor.getInstance() might return null. |
| */ |
| if (chm != null) { |
| chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal()); |
| } |
| } |
| this.timedOutDurableClientProxies.remove(proxy.getProxyID()); |
| |
| } |
| |
| protected void addClientInitProxy(CacheClientProxy proxy) throws IOException |
| { |
| this._initClientProxies.put(proxy.getProxyID(), proxy); |
| } |
| |
| protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException |
| { |
| this._initClientProxies.remove(proxy.getProxyID()); |
| } |
| |
| protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException |
| { |
| return this._initClientProxies.containsKey(proxy.getProxyID()); |
| } |
| |
| |
| /** |
| * Returns (possibly stale) set of memberIds for all clients being actively |
| * notified by this server. |
| * |
| * @return set of memberIds |
| */ |
| public Set getActiveClients() |
| { |
| Set clients = new HashSet(); |
| for (Iterator iter = getClientProxies().iterator(); iter.hasNext();) { |
| CacheClientProxy proxy = (CacheClientProxy)iter.next(); |
| if (proxy.hasRegisteredInterested()) { |
| ClientProxyMembershipID proxyID = proxy.getProxyID(); |
| clients.add(proxyID); |
| } |
| } |
| return clients; |
| } |
| |
| /** |
| * Return (possibly stale) list of all clients and their status |
| * @return Map, with CacheClientProxy as a key and CacheClientStatus as a value |
| */ |
| public Map getAllClients() |
| { |
| Map clients = new HashMap(); |
| for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { |
| CacheClientProxy proxy = (CacheClientProxy)iter.next(); |
| ClientProxyMembershipID proxyID = proxy.getProxyID(); |
| clients.put(proxyID, new CacheClientStatus(proxyID)); |
| } |
| return clients; |
| } |
| |
| /** |
| * Checks if there is any proxy present for the given durable client |
| * |
| * @param durableId - |
| * id for the durable-client |
| * @return - true if a proxy is present for the given durable client |
| * |
| * @since 5.6 |
| */ |
| public boolean hasDurableClient(String durableId) |
| { |
| for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { |
| CacheClientProxy proxy = (CacheClientProxy)iter.next(); |
| ClientProxyMembershipID proxyID = proxy.getProxyID(); |
| if (durableId.equals(proxyID.getDurableId())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Checks if there is any proxy which is primary for the given durable client |
| * |
| * @param durableId - |
| * id for the durable-client |
| * @return - true if a primary proxy is present for the given durable client |
| * |
| * @since 5.6 |
| */ |
| public boolean hasPrimaryForDurableClient(String durableId) |
| { |
| |
| for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { |
| CacheClientProxy proxy = (CacheClientProxy)iter.next(); |
| ClientProxyMembershipID proxyID = proxy.getProxyID(); |
| if (durableId.equals(proxyID.getDurableId())) { |
| if (proxy.isPrimary()) { |
| return true; |
| } |
| else { |
| return false; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Returns (possibly stale) map of queue sizes for all clients notified |
| * by this server. |
| * |
| * @return map with CacheClientProxy as key, and Integer as a value |
| */ |
| public Map getClientQueueSizes() |
| { |
| Map/*<ClientProxyMembershipID,Integer>*/ queueSizes = new HashMap(); |
| for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { |
| CacheClientProxy proxy = (CacheClientProxy)iter.next(); |
| queueSizes.put(proxy.getProxyID(), Integer.valueOf(proxy.getQueueSize())); |
| } |
| return queueSizes; |
| } |
| |
| public int getDurableClientHAQueueSize(String durableClientId) { |
| CacheClientProxy ccp = getClientProxy(durableClientId); |
| if (ccp == null) { |
| return -1; |
| } |
| return ccp.getQueueSizeStat(); |
| } |
| |
| //closes the cq and drains the queue |
| public boolean closeClientCq(String durableClientId, String clientCQName) |
| throws CqException { |
| CacheClientProxy proxy = getClientProxy(durableClientId); |
| // close and drain |
| if (proxy != null) { |
| return proxy.closeClientCq(clientCQName); |
| } |
| return false; |
| } |
| |
| |
| /** |
| * Removes an existing <code>CacheClientProxy</code> from the list of known |
| * client proxies |
| * |
| * @param proxy |
| * The <code>CacheClientProxy</code> to remove |
| */ |
| protected void removeClientProxy(CacheClientProxy proxy) |
| { |
| // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new Exception("stack trace")); |
| ClientProxyMembershipID client = proxy.getProxyID(); |
| this._clientProxies.remove(client); |
| this._connectionListener.queueRemoved(); |
| ((GemFireCacheImpl)this.getCache()).cleanupForClient(this, client); |
| if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { |
| ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); |
| if (chm != null) { |
| chm.numOfClientsPerVersion |
| .decrementAndGet(proxy.getVersion().ordinal()); |
| } |
| } |
| |
| } |
| |
| void durableClientTimedOut(ClientProxyMembershipID client) { |
| this.timedOutDurableClientProxies.add(client); |
| } |
| |
| public boolean isTimedOut(ClientProxyMembershipID client) { |
| return this.timedOutDurableClientProxies.contains(client); |
| } |
| |
| /** |
| * Returns an unmodifiable Collection of known |
| * <code>CacheClientProxy</code> instances. |
| * The collection is not static so its contents may change. |
| * |
| * @return the collection of known <code>CacheClientProxy</code> instances |
| */ |
| public Collection<CacheClientProxy> getClientProxies() { |
| return Collections.unmodifiableCollection(this._clientProxies.values()); |
| } |
| |
| // /** |
| // * Returns the <code>Executor</code> that delivers messages to the |
| // * <code>CacheClientProxy</code> instances. |
| // * @return the <code>Executor</code> that delivers messages to the |
| // * <code>CacheClientProxy</code> instances |
| // */ |
| // protected Executor getExecutor() |
| // { |
| // return _executor; |
| // } |
| |
| private void closeAllClientCqs(CacheClientProxy proxy) { |
| CqService cqService = proxy.getCache().getCqService(); |
| if (cqService != null) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); // LocalizedMessage.create( |
| try { |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: Closing client CQs: {}", proxy); |
| } |
| cqService.closeClientCqs(proxy.getProxyID()); |
| } catch (CqException e1) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_UNABLE_TO_CLOSE_CQS_FOR_THE_CLIENT__0, proxy.getProxyID())); |
| if (isDebugEnabled) { |
| e1.printStackTrace(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Shuts down durable client proxy |
| * |
| */ |
| public boolean closeDurableClientProxy(String durableClientId) |
| throws CacheException { |
| CacheClientProxy ccp = getClientProxy(durableClientId); |
| if (ccp == null) { |
| return false; |
| } |
| //we can probably remove the isPaused check |
| if (ccp.isPaused() && !ccp.isConnected()) { |
| ccp.setKeepAlive(false); |
| closeDeadProxies(Collections.singletonList(ccp), true); |
| return true; |
| } |
| else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Cannot close running durable client: {}", durableClientId); |
| } |
| throw new CacheException("Cannot close a running durable client : " + durableClientId){}; |
| } |
| } |
| |
| /** |
| * Close dead <code>CacheClientProxy</code> instances |
| * |
| * @param deadProxies |
| * The list of <code>CacheClientProxy</code> instances to close |
| */ |
| private void closeDeadProxies(List deadProxies, boolean stoppedNormally) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| for (Iterator i = deadProxies.iterator(); i.hasNext();) { |
| CacheClientProxy proxy = (CacheClientProxy)i.next(); |
| if (isDebugEnabled) |
| logger.debug("CacheClientNotifier: Closing dead client: {}", proxy); |
| |
| // Close the proxy |
| boolean keepProxy = false; |
| try { |
| keepProxy = proxy.close(false, stoppedNormally); |
| } |
| catch (CancelException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| } |
| |
| // Remove the proxy if necessary. It might not be necessary to remove the |
| // proxy if it is durable. |
| if (keepProxy) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_KEEPING_PROXY_FOR_DURABLE_CLIENT_NAMED_0_FOR_1_SECONDS_2, new Object[] {proxy.getDurableId(), Integer.valueOf(proxy.getDurableTimeout()), proxy})); |
| } else { |
| closeAllClientCqs(proxy); |
| if (isDebugEnabled) { |
| logger.debug("CacheClientNotifier: Not keeping proxy for non-durable client: {}", proxy); |
| } |
| removeClientProxy(proxy); |
| } |
| proxy.notifyRemoval(); |
| } // for |
| } |
| |
| |
| /** |
| * Registers a new <code>InterestRegistrationListener</code> with the set of |
| * <code>InterestRegistrationListener</code>s. |
| * |
| * @param listener |
| * The <code>InterestRegistrationListener</code> to register |
| * |
| * @since 5.8Beta |
| */ |
| public void registerInterestRegistrationListener( |
| InterestRegistrationListener listener) { |
| this.writableInterestRegistrationListeners.add(listener); |
| } |
| |
| /** |
| * Unregisters an existing <code>InterestRegistrationListener</code> from |
| * the set of <code>InterestRegistrationListener</code>s. |
| * |
| * @param listener |
| * The <code>InterestRegistrationListener</code> to |
| * unregister |
| * |
| * @since 5.8Beta |
| */ |
| public void unregisterInterestRegistrationListener( |
| InterestRegistrationListener listener) { |
| this.writableInterestRegistrationListeners.remove(listener); |
| } |
| |
| /** |
| * Returns a read-only collection of <code>InterestRegistrationListener</code>s |
| * registered with this notifier. |
| * |
| * @return a read-only collection of <code>InterestRegistrationListener</code>s |
| * registered with this notifier |
| * |
| * @since 5.8Beta |
| */ |
| public Set getInterestRegistrationListeners() { |
| return this.readableInterestRegistrationListeners; |
| } |
| |
| /** |
| * |
| * @since 5.8Beta |
| */ |
| protected boolean containsInterestRegistrationListeners() { |
| return !this.writableInterestRegistrationListeners.isEmpty(); |
| } |
| |
| /** |
| * |
| * @since 5.8Beta |
| */ |
| protected void notifyInterestRegistrationListeners( |
| InterestRegistrationEvent event) { |
| for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i |
| .hasNext();) { |
| InterestRegistrationListener listener = (InterestRegistrationListener)i |
| .next(); |
| if (event.isRegister()) { |
| listener.afterRegisterInterest(event); |
| } |
| else { |
| listener.afterUnregisterInterest(event); |
| } |
| } |
| } |
| |
| /** |
| * Test method used to determine the state of the CacheClientNotifier |
| * |
| * @return the statistics for the notifier |
| */ |
| public CacheClientNotifierStats getStats() |
| { |
| return this._statistics; |
| } |
| |
| /** |
| * Returns this <code>CacheClientNotifier</code>'s <code>Cache</code>. |
| * |
| * @return this <code>CacheClientNotifier</code>'s <code>Cache</code> |
| */ |
| protected Cache getCache() { // TODO:SYNC: looks wrong |
| if (this._cache != null && this._cache.isClosed()) { |
| GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); |
| if (cache != null) { |
| this._cache = cache; |
| this.logWriter = cache.getInternalLogWriter(); |
| this.securityLogWriter = cache.getSecurityInternalLogWriter(); |
| } |
| } |
| return this._cache; |
| } |
| |
| /** |
| * Returns this <code>CacheClientNotifier</code>'s maximum message count. |
| * |
| * @return this <code>CacheClientNotifier</code>'s maximum message count |
| */ |
| protected int getMaximumMessageCount() { |
| return this.maximumMessageCount; |
| } |
| |
| /** |
| * Returns this <code>CacheClientNotifier</code>'s message time-to-live. |
| * |
| * @return this <code>CacheClientNotifier</code>'s message time-to-live |
| */ |
| protected int getMessageTimeToLive() { |
| return this.messageTimeToLive; |
| } |
| |
| protected void handleInterestEvent(InterestRegistrationEvent event) { |
| LocalRegion region = (LocalRegion)event.getRegion(); |
| region.handleInterestEvent(event); |
| |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param cache |
| * The GemFire <code>Cache</code> |
| * @param acceptorStats |
| * @param maximumMessageCount |
| * @param messageTimeToLive |
| * @param transactionTimeToLive - ttl for txstates for disconnected clients |
| * @param listener a listener which should receive notifications |
| * abouts queues being added or removed. |
| * @param overflowAttributesList |
| */ |
| private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, |
| int maximumMessageCount, int messageTimeToLive, int transactionTimeToLive, |
| ConnectionListener listener, |
| List overflowAttributesList, boolean isGatewayReceiver) { |
| // Set the Cache |
| this.setCache((GemFireCacheImpl)cache); |
| this.acceptorStats = acceptorStats; |
| |
| // Set the LogWriter |
| this.logWriter = (InternalLogWriter)cache.getLogger(); |
| |
| this._connectionListener = listener; |
| |
| // Set the security LogWriter |
| this.securityLogWriter = (InternalLogWriter)cache.getSecurityLogger(); |
| |
| // Create the overflow artifacts |
| if (overflowAttributesList != null |
| && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList |
| .get(0))) { |
| haContainer = new HAContainerRegion(cache.getRegion(Region.SEPARATOR |
| + BridgeServerImpl.clientMessagesRegion((GemFireCacheImpl)cache, |
| (String)overflowAttributesList.get(0), |
| ((Integer)overflowAttributesList.get(1)).intValue(), |
| ((Integer)overflowAttributesList.get(2)).intValue(), |
| (String)overflowAttributesList.get(3), |
| (Boolean)overflowAttributesList.get(4)))); |
| } |
| else { |
| haContainer = new HAContainerMap(new HashMap()); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("ha container ({}) has been created.", haContainer.getName()); |
| } |
| |
| this.maximumMessageCount = maximumMessageCount; |
| this.messageTimeToLive = messageTimeToLive; |
| this.transactionTimeToLive = transactionTimeToLive; |
| |
| // Initialize the statistics |
| StatisticsFactory factory ; |
| if(isGatewayReceiver){ |
| factory = new DummyStatisticsFactory(); |
| }else{ |
| factory = this.getCache().getDistributedSystem(); |
| } |
| this._statistics = new CacheClientNotifierStats(factory); |
| |
| // Initialize the executors |
| // initializeExecutors(this._logger); |
| |
| try { |
| this.logFrequency = Long.valueOf(System |
| .getProperty(MAX_QUEUE_LOG_FREQUENCY)); |
| if (this.logFrequency <= 0) { |
| this.logFrequency = DEFAULT_LOG_FREQUENCY; |
| } |
| } catch (Exception e) { |
| this.logFrequency = DEFAULT_LOG_FREQUENCY; |
| } |
| |
| eventEnqueueWaitTime = Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, |
| DEFAULT_EVENT_ENQUEUE_WAIT_TIME); |
| if (eventEnqueueWaitTime < 0) { |
| eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; |
| } |
| |
| // Schedule task to periodically ping clients. |
| scheduleClientPingTask(); |
| } |
| |
| /** |
| * this message is used to send interest registration to another server. |
| * Since interest registration performs a state-flush operation this |
| * message must not transmitted on an ordered socket |
| */ |
| public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage |
| implements MessageWithReply { |
| ClientProxyMembershipID clientId; |
| ClientInterestMessageImpl clientMessage; |
| int processorId; |
| |
| ServerInterestRegistrationMessage(ClientProxyMembershipID clientID, |
| ClientInterestMessageImpl msg) { |
| this.clientId = clientID; |
| this.clientMessage = msg; |
| } |
| |
| public ServerInterestRegistrationMessage() { } |
| |
| static void sendInterestChange(DM dm, |
| ClientProxyMembershipID clientID, |
| ClientInterestMessageImpl msg) { |
| ServerInterestRegistrationMessage smsg = new ServerInterestRegistrationMessage( |
| clientID, msg); |
| Set recipients = dm.getOtherDistributionManagerIds(); |
| smsg.setRecipients(recipients); |
| ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients); |
| smsg.processorId = rp.getProcessorId(); |
| dm.putOutgoing(smsg); |
| try { |
| rp.waitForReplies(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| |
| /* (non-Javadoc) |
| * @see com.gemstone.gemfire.distributed.internal.DistributionMessage#process(com.gemstone.gemfire.distributed.internal.DistributionManager) |
| */ |
| @Override |
| protected void process(DistributionManager dm) { |
| // Get the proxy for the proxy id |
| try { |
| CacheClientNotifier ccn = CacheClientNotifier.getInstance(); |
| if (ccn != null) { |
| CacheClientProxy proxy = ccn.getClientProxy(clientId); |
| // If this VM contains a proxy for the requested proxy id, forward the |
| // message on to the proxy for processing |
| if (proxy != null) { |
| proxy.processInterestMessage(this.clientMessage); |
| } |
| } |
| } finally { |
| ReplyMessage reply = new ReplyMessage(); |
| reply.setProcessorId(this.processorId); |
| reply.setRecipient(getSender()); |
| try { |
| dm.putOutgoing(reply); |
| } catch (CancelException e) { |
| // can't send a reply, so ignore the exception |
| } |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see com.gemstone.gemfire.internal.DataSerializableFixedID#getDSFID() |
| */ |
| public int getDSFID() { |
| return SERVER_INTEREST_REGISTRATION_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeInt(this.processorId); |
| InternalDataSerializer.invokeToData(this.clientId, out); |
| InternalDataSerializer.invokeToData(this.clientMessage, out); |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.processorId = in.readInt(); |
| this.clientId = new ClientProxyMembershipID(); |
| InternalDataSerializer.invokeFromData(this.clientId, in); |
| this.clientMessage = new ClientInterestMessageImpl(); |
| InternalDataSerializer.invokeFromData(this.clientMessage, in); |
| } |
| |
| } |
| |
| |
| // * Initializes the <code>QueuedExecutor</code> and |
| // <code>PooledExecutor</code> |
| // * used to deliver messages to <code>CacheClientProxy</code> instances. |
| // * @param logger The GemFire <code>LogWriterI18n</code> |
| // */ |
| // private void initializeExecutors(LogWriterI18n logger) |
| // { |
| // // Create the thread groups |
| // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache |
| // Client Notifier Logger Group", logger); |
| // final ThreadGroup notifierGroup = |
| // new ThreadGroup("Cache Client Notifier Group") |
| // { |
| // public void uncaughtException(Thread t, Throwable e) |
| // { |
| // Thread.dumpStack(); |
| // loggerGroup.uncaughtException(t, e); |
| // //CacheClientNotifier.exceptionInThreads = true; |
| // } |
| // }; |
| // |
| // // Originally set ThreadGroup to be a daemon, but it was causing the |
| // following |
| // // exception after five minutes of non-activity (the keep alive time of the |
| // // threads in the PooledExecutor. |
| // |
| // // java.lang.IllegalThreadStateException |
| // // at java.lang.ThreadGroup.add(Unknown Source) |
| // // at java.lang.Thread.init(Unknown Source) |
| // // at java.lang.Thread.<init>(Unknown Source) |
| // // at |
| // com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321) |
| // // at |
| // com.gemstone.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512) |
| // // at |
| // com.gemstone.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888) |
| // // at |
| // com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95) |
| // // at |
| // com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271) |
| // |
| // //notifierGroup.setDaemon(true); |
| // |
| // if (USE_QUEUED_EXECUTOR) |
| // createQueuedExecutor(notifierGroup); |
| // else |
| // createPooledExecutor(notifierGroup); |
| // } |
| |
| // /** |
| // * Creates the <code>QueuedExecutor</code> used to deliver messages |
| // * to <code>CacheClientProxy</code> instances |
| // * @param notifierGroup The <code>ThreadGroup</code> to which the |
| // * <code>QueuedExecutor</code>'s <code>Threads</code> belong |
| // */ |
| // protected void createQueuedExecutor(final ThreadGroup notifierGroup) |
| // { |
| // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue()); |
| // queuedExecutor.setThreadFactory(new ThreadFactory() |
| // { |
| // public Thread newThread(Runnable command) |
| // { |
| // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client |
| // Notifier"); |
| // thread.setDaemon(true); |
| // return thread; |
| // } |
| // }); |
| // _executor = queuedExecutor; |
| // } |
| |
| // /** |
| // * Creates the <code>PooledExecutor</code> used to deliver messages |
| // * to <code>CacheClientProxy</code> instances |
| // * @param notifierGroup The <code>ThreadGroup</code> to which the |
| // * <code>PooledExecutor</code>'s <code>Threads</code> belong |
| // */ |
| // protected void createPooledExecutor(final ThreadGroup notifierGroup) |
| // { |
| // PooledExecutor pooledExecutor = new PooledExecutor(new |
| // BoundedLinkedQueue(4096), 50); |
| // pooledExecutor.setMinimumPoolSize(10); |
| // pooledExecutor.setKeepAliveTime(1000 * 60 * 5); |
| // pooledExecutor.setThreadFactory(new ThreadFactory() |
| // { |
| // public Thread newThread(Runnable command) |
| // { |
| // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client |
| // Notifier"); |
| // thread.setDaemon(true); |
| // return thread; |
| // } |
| // }); |
| // pooledExecutor.createThreads(5); |
| // _executor = pooledExecutor; |
| // } |
| |
| protected void deliverInterestChange(ClientProxyMembershipID proxyID, |
| ClientInterestMessageImpl message) { |
| DM dm = ((InternalDistributedSystem)this.getCache() |
| .getDistributedSystem()).getDistributionManager(); |
| ServerInterestRegistrationMessage.sendInterestChange(dm, proxyID, message); |
| } |
| |
| public CacheServerStats getAcceptorStats() { |
| return this.acceptorStats; |
| } |
| |
| public void addCompiledQuery(DefaultQuery query){ |
| if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null){ |
| // Added successfully. |
| this._statistics.incCompiledQueryCount(1); |
| if (logger.isDebugEnabled()){ |
| logger.debug("Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}", query.getQueryString(), this._statistics.getCompiledQueryCount()); |
| } |
| // Start the clearIdleCompiledQueries thread. |
| startCompiledQueryCleanupThread(); |
| } |
| } |
| |
| public Query getCompiledQuery(String queryString){ |
| return this.compiledQueries.get(queryString); |
| } |
| |
| private void clearCompiledQueries(){ |
| if (this.compiledQueries.size() > 0){ |
| this._statistics.incCompiledQueryCount(-(this.compiledQueries.size())); |
| this.compiledQueries.clear(); |
| if (logger.isDebugEnabled()){ |
| logger.debug("Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}", this._statistics.getCompiledQueryCount()); |
| } |
| } |
| } |
| |
| /** |
| * This starts the cleanup thread that periodically |
| * (DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME) checks for the |
| * compiled queries that are not used and removes them. |
| */ |
| private void startCompiledQueryCleanupThread() { |
| if (isCompiledQueryCleanupThreadStarted){ |
| return; |
| } |
| |
| SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { |
| @Override |
| public void run2() { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| for (Map.Entry<String, DefaultQuery> e : compiledQueries.entrySet()){ |
| DefaultQuery q = e.getValue(); |
| // Check if the query last used flag. |
| // If its true set it to false. If its false it means it is not used |
| // from the its last checked. |
| if (q.getLastUsed()) { |
| q.setLastUsed(false); |
| } else { |
| if (compiledQueries.remove(e.getKey()) != null) { |
| // If successfully removed decrement the counter. |
| _statistics.incCompiledQueryCount(-1); |
| if (isDebugEnabled){ |
| logger.debug("Removed compiled query from ccn.compliedQueries list. Query: " + q.getQueryString() + ". Total compiled queries are : " + _statistics.getCompiledQueryCount()); |
| } |
| } |
| } |
| } |
| } |
| }; |
| |
| synchronized (lockIsCompiledQueryCleanupThreadStarted) { |
| if (!isCompiledQueryCleanupThreadStarted) { |
| long period = DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME > 0 ? |
| DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME:DefaultQuery.COMPILED_QUERY_CLEAR_TIME; |
| _cache.getCCPTimer().scheduleAtFixedRate(task, period, period); |
| } |
| isCompiledQueryCleanupThreadStarted = true; |
| } |
| } |
| |
| protected void scheduleClientPingTask() { |
| this.clientPingTask = new SystemTimer.SystemTimerTask() { |
| |
| @Override |
| public void run2() { |
| // If there are no proxies, return |
| if (CacheClientNotifier.this._clientProxies.isEmpty()) { |
| return; |
| } |
| |
| // Create ping message |
| ClientMessage message = new ClientPingMessageImpl(); |
| |
| // Determine clients to ping |
| for (CacheClientProxy proxy : getClientProxies()) { |
| logger.debug("Checking whether to ping {}", proxy); |
| // Ping clients whose version is GE 6.6.2.2 |
| if (proxy.getVersion().compareTo(Version.GFE_6622) >= 0) { |
| // Send the ping message directly to the client. Do not qo through |
| // the queue. If the queue were used, the secondary connection would |
| // not be pinged. Instead, pings would just build up in secondary |
| // queue and never be sent. The counter is used to help scalability. |
| // If normal messages are sent by the proxy, then the counter will |
| // be reset and no pings will be sent. |
| if (proxy.incrementAndGetPingCounter() >= CLIENT_PING_TASK_COUNTER) { |
| logger.debug("Pinging {}", proxy); |
| proxy.sendMessageDirectly(message); |
| logger.debug("Done pinging {}", proxy); |
| } else { |
| logger.debug("Not pinging because not idle: {}", proxy); |
| } |
| } else { |
| logger.debug("Ignoring because of version: {}", proxy); |
| } |
| } |
| } |
| }; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Scheduling client ping task with period={} ms", CLIENT_PING_TASK_PERIOD); |
| } |
| CacheClientNotifier.this._cache.getCCPTimer() |
| .scheduleAtFixedRate(this.clientPingTask, CLIENT_PING_TASK_PERIOD, |
| CLIENT_PING_TASK_PERIOD); |
| } |
| |
| /** |
| * A string representing all hosts used for delivery purposes. |
| */ |
| protected static final String ALL_HOSTS = "ALL_HOSTS"; |
| |
| /** |
| * An int representing all ports used for delivery purposes. |
| */ |
| protected static final int ALL_PORTS = -1; |
| |
| // /** |
| // * Whether to synchonously deliver messages to proxies. |
| // * This is currently hard-coded to true to ensure ordering. |
| // */ |
| // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION = |
| // true; |
| // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION"); |
| |
| // /** |
| // * Whether to use the <code>QueuedExecutor</code> (or the |
| // * <code>PooledExecutor</code>) to deliver messages to proxies. |
| // * Currently, delivery is synchronous. No <code>Executor</code> is |
| // * used. |
| // */ |
| // protected static final boolean USE_QUEUED_EXECUTOR = |
| // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR"); |
| |
| /** |
| * The map of known <code>CacheClientProxy</code> instances. |
| * Maps ClientProxyMembershipID to CacheClientProxy. |
| * Note that the keys in this map are not updated when a durable client reconnects. |
| * To make sure you get the updated ClientProxyMembershipID use this map to |
| * lookup the CacheClientProxy and then call getProxyID on it. |
| */ |
| private final ConcurrentMap/*<ClientProxyMembershipID, CacheClientProxy>*/ _clientProxies |
| = new ConcurrentHashMap(); |
| |
| /** |
| * The map of <code>CacheClientProxy</code> instances which are getting |
| * initialized. |
| * Maps ClientProxyMembershipID to CacheClientProxy. |
| */ |
| private final ConcurrentMap/*<ClientProxyMembershipID, CacheClientProxy>*/ _initClientProxies |
| = new ConcurrentHashMap(); |
| |
| private final HashSet<ClientProxyMembershipID> timedOutDurableClientProxies |
| = new HashSet<ClientProxyMembershipID>(); |
| |
| /** |
| * The GemFire <code>Cache</code>. Note that since this is a singleton class |
| * you should not use a direct reference to _cache in CacheClientNotifier code. |
| * Instead, you should always use <code>getCache()</code> |
| */ |
| private GemFireCacheImpl _cache; |
| |
| private InternalLogWriter logWriter; |
| |
| /** |
| * The GemFire security <code>LogWriter</code> |
| */ |
| private InternalLogWriter securityLogWriter; |
| |
| /** the maximum number of messages that can be enqueued in a client-queue. */ |
| private int maximumMessageCount; |
| |
| /** |
| * the time (in seconds) after which a message in the client queue will |
| * expire. |
| */ |
| private int messageTimeToLive; |
| |
| /** |
| * A listener which receives notifications |
| * about queues that are added or removed |
| */ |
| private ConnectionListener _connectionListener; |
| |
| private CacheServerStats acceptorStats; |
| |
| /** |
| * haContainer can hold either the name of the client-messages-region |
| * (in case of eviction policies "mem" or "entry") or an instance of HashMap |
| * (in case of eviction policy "none"). In both the cases, it'll store |
| * HAEventWrapper as its key and ClientUpdateMessage as its value. |
| */ |
| private final HAContainerWrapper haContainer; |
| |
| // /** |
| // * The singleton <code>CacheClientNotifier</code> instance |
| // */ |
| // protected static CacheClientNotifier _instance; |
| /** |
| * The size of the server-to-client communication socket buffers. This can be |
| * modified using the BridgeServer.SOCKET_BUFFER_SIZE system property. |
| */ |
| static final private int socketBufferSize = Integer.getInteger( |
| "BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue(); |
| |
| /** |
| * The statistics for this notifier |
| */ |
| protected final CacheClientNotifierStats _statistics; |
| |
| /** |
| * The <code>InterestRegistrationListener</code> instances registered in |
| * this VM. This is used when modifying the set of listeners. |
| */ |
| private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet(); |
| |
| /** |
| * The <code>InterestRegistrationListener</code> instances registered in |
| * this VM. This is used to provide a read-only <code>Set</code> of |
| * listeners. |
| */ |
| private final Set readableInterestRegistrationListeners = Collections |
| .unmodifiableSet(writableInterestRegistrationListeners); |
| |
| /** |
| * System property name for indicating how much frequently the "Queue full" |
| * message should be logged. |
| */ |
| public static final String MAX_QUEUE_LOG_FREQUENCY = "gemfire.logFrequency.clientQueueReachedMaxLimit"; |
| |
| public static final long DEFAULT_LOG_FREQUENCY = 1000; |
| |
| public static final String EVENT_ENQUEUE_WAIT_TIME_NAME = "gemfire.subscription.EVENT_ENQUEUE_WAIT_TIME"; |
| |
| public static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100; |
| |
| /** |
| * System property value denoting the time in milliseconds. Any thread putting |
| * an event into a subscription queue, which is full, will wait this much time |
| * for the queue to make space. It'll then enque the event possibly causing |
| * the queue to grow beyond its capacity/max-size. See #51400. |
| */ |
| public static int eventEnqueueWaitTime; |
| |
| /** |
| * The frequency of logging the "Queue full" message. |
| */ |
| private long logFrequency = DEFAULT_LOG_FREQUENCY; |
| |
| private final ConcurrentHashMap<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<String, DefaultQuery>(); |
| |
| private volatile boolean isCompiledQueryCleanupThreadStarted = false; |
| |
| private final Object lockIsCompiledQueryCleanupThreadStarted = new Object(); |
| |
| private SystemTimer.SystemTimerTask clientPingTask; |
| |
| private static final long CLIENT_PING_TASK_PERIOD = |
| Long.getLong("gemfire.serverToClientPingPeriod", 60000); |
| |
| private static final long CLIENT_PING_TASK_COUNTER = |
| Long.getLong("gemfire.serverToClientPingCounter", 3); |
| |
| public long getLogFrequency() { |
| return this.logFrequency; |
| } |
| |
| /** |
| * @return the haContainer |
| */ |
| public Map getHaContainer() { |
| return haContainer; |
| } |
| |
| private final Set blackListedClients = new CopyOnWriteArraySet(); |
| |
| public void addToBlacklistedClient(ClientProxyMembershipID proxyID) { |
| blackListedClients.add(proxyID); |
| // ensure that cache and distributed system state are current and open |
| this.getCache(); |
| new ScheduledThreadPoolExecutor(1).schedule( |
| new ExpireBlackListTask(proxyID), 120, TimeUnit.SECONDS); |
| } |
| |
| public Set getBlacklistedClient() { |
| return blackListedClients; |
| } |
| |
| /** |
| * @param _cache the _cache to set |
| */ |
| private void setCache(GemFireCacheImpl _cache) { |
| this._cache = _cache; |
| } |
| |
| |
| private class ExpireBlackListTask extends PoolTask { |
| private ClientProxyMembershipID proxyID; |
| |
| public ExpireBlackListTask(ClientProxyMembershipID proxyID) { |
| this.proxyID = proxyID; |
| } |
| |
| @Override |
| public void run2() { |
| if (blackListedClients.remove(proxyID)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} client is no longer blacklisted", proxyID); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * @return the time-to-live for abandoned transactions, in seconds |
| */ |
| public int getTransactionTimeToLive() { |
| return this.transactionTimeToLive; |
| } |
| } |
| |