| /* |
| * ========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| * ========================================================================= |
| */ |
| |
| package com.gemstone.gemfire.internal.cache.tier.sockets; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.regex.Pattern; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.StatisticsFactory; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheClosedException; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.ClientSession; |
| import com.gemstone.gemfire.cache.DynamicRegionFactory; |
| import com.gemstone.gemfire.cache.InterestRegistrationEvent; |
| import com.gemstone.gemfire.cache.InterestResultPolicy; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.RegionExistsException; |
| import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker; |
| import com.gemstone.gemfire.cache.operations.DestroyOperationContext; |
| import com.gemstone.gemfire.cache.operations.InvalidateOperationContext; |
| import com.gemstone.gemfire.cache.operations.OperationContext; |
| import com.gemstone.gemfire.cache.operations.PutOperationContext; |
| import com.gemstone.gemfire.cache.operations.RegionClearOperationContext; |
| import com.gemstone.gemfire.cache.operations.RegionCreateOperationContext; |
| import com.gemstone.gemfire.cache.operations.RegionDestroyOperationContext; |
| import com.gemstone.gemfire.cache.query.CqException; |
| import com.gemstone.gemfire.cache.query.CqQuery; |
| import com.gemstone.gemfire.cache.query.internal.cq.CqService; |
| import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; |
| import com.gemstone.gemfire.distributed.DistributedMember; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; |
| import com.gemstone.gemfire.internal.SocketCreator; |
| import com.gemstone.gemfire.internal.SystemTimer; |
| import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.cache.BridgeObserver; |
| import com.gemstone.gemfire.internal.cache.BridgeObserverHolder; |
| import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisee; |
| import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; |
| import com.gemstone.gemfire.internal.cache.Conflatable; |
| import com.gemstone.gemfire.internal.cache.DistributedRegion; |
| import com.gemstone.gemfire.internal.cache.EnumListenerEvent; |
| import com.gemstone.gemfire.internal.cache.EventID; |
| import com.gemstone.gemfire.internal.cache.FilterProfile; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.InterestRegistrationEventImpl; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.StateFlushOperation; |
| import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper; |
| import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; |
| import com.gemstone.gemfire.internal.cache.ha.HARegionQueueAttributes; |
| import com.gemstone.gemfire.internal.cache.ha.HARegionQueueStats; |
| import com.gemstone.gemfire.internal.cache.tier.InterestType; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl.CqNameToOp; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.logging.log4j.LogMarker; |
| import com.gemstone.gemfire.internal.security.AuthorizeRequestPP; |
| import com.gemstone.gemfire.security.AccessControl; |
| import com.gemstone.org.jgroups.util.StringId; |
| |
| /** |
| * Class <code>CacheClientProxy</code> represents the server side of the |
| * {@link CacheClientUpdater}. It queues messages to be sent from the server to |
| * the client. It then reads those messages from the queue and sends them to the |
| * client. |
| * |
| * @author Barry Oglesby |
| * |
| * @since 4.2 |
| */ |
| @SuppressWarnings("synthetic-access") |
| public class CacheClientProxy implements ClientSession { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * The socket between the server and the client |
| */ |
| protected Socket _socket; |
| |
| /** |
| * A communication buffer used by each message we send to the client |
| */ |
| protected ByteBuffer _commBuffer; |
| |
| /** |
| * The remote host's IP address string (cached for convenience) |
| */ |
| protected String _remoteHostAddress; |
| |
| /** |
| * @guarded.By {@link #isMarkedForRemovalLock} |
| */ |
| protected boolean isMarkedForRemoval = false; |
| |
| /** |
| * @see #isMarkedForRemoval |
| */ |
| protected final Object isMarkedForRemovalLock = new Object(); |
| |
| /** |
| * The proxy id of the client represented by this proxy |
| */ |
| protected ClientProxyMembershipID proxyID; |
| |
| /** |
| * The GemFire cache |
| */ |
| protected final GemFireCacheImpl _cache; |
| |
| /** |
| * The list of keys that the client represented by this proxy is interested in |
| * (stored by region) |
| */ |
| protected final ClientInterestList[] cils = new ClientInterestList[2]; |
| |
| /** |
| * A thread that dispatches messages to the client |
| */ |
| protected volatile MessageDispatcher _messageDispatcher; |
| |
| /** |
| * The statistics for this proxy |
| */ |
| protected final CacheClientProxyStats _statistics; |
| |
| protected final AtomicReference _durableExpirationTask = new AtomicReference(); |
| |
| protected SystemTimer durableTimer; |
| |
| /** |
| * Whether this dispatcher is paused |
| */ |
| protected volatile boolean _isPaused = true; |
| |
| /** |
| * True if we are connected to a client. |
| */ |
| private volatile boolean connected = false; |
| // /** |
| // * A string representing interest in all keys |
| // */ |
| // protected static final String ALL_KEYS = "ALL_KEYS"; |
| // |
| /** |
| * True if a marker message is still in the ha queue. |
| */ |
| private boolean markerEnqueued = false; |
| |
| /** |
| * The number of times to peek on shutdown before giving up and shutting down |
| */ |
| protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer.getInteger("gemfire.MAXIMUM_SHUTDOWN_PEEKS",50).intValue(); |
| |
| /** |
| * The number of milliseconds to wait for an offering to the message queue |
| */ |
| protected static final int MESSAGE_OFFER_TIME = 0; |
| |
| /** |
| * The default maximum message queue size |
| */ |
| // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000; |
| |
| /** The message queue size */ |
| protected final int _maximumMessageCount; |
| |
| /** |
| * The time (in seconds ) after which a message in the client queue will |
| * expire. |
| */ |
| protected final int _messageTimeToLive; |
| |
| /** |
| * The <code>CacheClientNotifier</code> registering this proxy. |
| */ |
| protected final CacheClientNotifier _cacheClientNotifier; |
| |
| /** |
| * Defaults to true; meaning do some logging of dropped client notification |
| * messages. Set the system property to true to cause dropped messages to NOT |
| * be logged. |
| */ |
| protected static final boolean LOG_DROPPED_MSGS = !Boolean |
| .getBoolean("gemfire.disableNotificationWarnings"); |
| |
| /** |
| * for testing purposes, delays the start of the dispatcher thread |
| */ |
| public static boolean isSlowStartForTesting = false; |
| |
| /** |
| * Default value for slow starting time of dispatcher |
| */ |
| private static final long DEFAULT_SLOW_STARTING_TIME = 5000; |
| |
| /** |
| * Key in the system property from which the slow starting time value will be |
| * retrieved |
| */ |
| private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting"; |
| |
| private boolean isPrimary; |
| |
| /** @since 5.7 */ |
| protected byte clientConflation = HandShake.CONFLATION_DEFAULT; |
| |
| /** |
| * Flag to indicate whether to keep a durable client's queue alive |
| */ |
| boolean keepalive = false; |
| |
| private AccessControl postAuthzCallback; |
| |
| /** |
| * For multiuser environment.. |
| */ |
| private ClientUserAuths clientUserAuths; |
| |
| private final Object clientUserAuthsLock = new Object(); |
| |
| /** |
| * The version of the client |
| */ |
| private Version clientVersion; |
| |
| /** |
| * A map of region name as key and integer as its value. Basically, it stores |
| * the names of the regions with <code>DataPolicy</code> as EMPTY. If an |
| * event's region name is present in this map, it's full value (and not |
| * delta) is sent to the client represented by this proxy. |
| * |
| * @since 6.1 |
| */ |
| private volatile Map regionsWithEmptyDataPolicy = new HashMap(); |
| |
| /** |
| * A debug flag used for testing Backward compatibility |
| */ |
| public static boolean AFTER_MESSAGE_CREATION_FLAG = false; |
| |
| /** |
| * Notify the region when a client interest registration occurs. This tells |
| * the region to update access time when an update is to be pushed to a |
| * client. It is enabled only for <code>PartitionedRegion</code>s |
| * currently. |
| */ |
| protected static final boolean NOTIFY_REGION_ON_INTEREST = Boolean |
| .getBoolean("gemfire.updateAccessTimeOnClientInterest"); |
| |
| /** |
| * The AcceptorImpl identifier to which the proxy is connected. |
| */ |
| private final long _acceptorId; |
| |
| /** acceptor's setting for notifyBySubscription */ |
| private final boolean notifyBySubscription; |
| |
| /** To queue the events arriving during message dispatcher initialization */ |
| private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents = new ConcurrentLinkedQueue<Conflatable>(); |
| |
| private final Object queuedEventsSync = new Object(); |
| |
| private volatile boolean messageDispatcherInit = false; |
| |
| /** |
| * A counter that keeps track of how many task iterations that have occurred |
| * since the last ping or message. The |
| * {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} |
| * increments it. Normal messages sent to the client reset it. If the counter |
| * reaches 3, a ping is sent. |
| */ |
| private final AtomicInteger pingCounter = new AtomicInteger(); |
| |
| |
| /** Date on which this instances was created */ |
| private Date creationDate; |
| |
| /** true when the durable client associated with this proxy is being |
| * restarted and prevents cqs from being closed and drained**/ |
| private boolean drainLocked = false; |
| private final Object drainLock = new Object(); |
| |
| /** number of cq drains that are currently in progress **/ |
| private int numDrainsInProgress = 0; |
| private final Object drainsInProgressLock = new Object(); |
| |
| /** |
| * Constructor. |
| * |
| * @param ccn |
| * The <code>CacheClientNotifier</code> registering this proxy |
| * @param socket |
| * The socket between the server and the client |
| * @param proxyID |
| * representing the Connection Proxy of the clien |
| * @param isPrimary |
| * The boolean stating whether this prozxy is primary |
| * @throws CacheException { |
| */ |
| protected CacheClientProxy(CacheClientNotifier ccn, Socket socket, |
| ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation, |
| Version clientVersion, long acceptorId, boolean notifyBySubscription) |
| throws CacheException { |
| initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion); |
| this._cacheClientNotifier = ccn; |
| this._cache = (GemFireCacheImpl)ccn.getCache(); |
| this._maximumMessageCount = ccn.getMaximumMessageCount(); |
| this._messageTimeToLive = ccn.getMessageTimeToLive(); |
| this._acceptorId = acceptorId; |
| this.notifyBySubscription = notifyBySubscription; |
| StatisticsFactory factory = this._cache.getDistributedSystem(); |
| this._statistics = new CacheClientProxyStats(factory, |
| "id_"+this.proxyID.getDistributedMember().getId()+ "_at_"+ this._remoteHostAddress + ":" + this._socket.getPort()); |
| |
| // Create the interest list |
| this.cils[RegisterInterestTracker.interestListIndex] = |
| new ClientInterestList(this, this.proxyID); |
| // Create the durable interest list |
| this.cils[RegisterInterestTracker.durableInterestListIndex] = |
| new ClientInterestList(this, this.getDurableId()); |
| this.postAuthzCallback = null; |
| this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections(); |
| this.creationDate = new Date(); |
| initializeClientAuths(); |
| } |
| |
| private void initializeClientAuths() |
| { |
| if(AcceptorImpl.isPostAuthzCallbackPresent()) |
| this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID); |
| } |
| |
| private void reinitializeClientAuths() |
| { |
| if (this.clientUserAuths != null && AcceptorImpl.isPostAuthzCallbackPresent()) { |
| synchronized (this.clientUserAuthsLock) { |
| ClientUserAuths newClientAuth = ServerConnection.getClientUserAuths(this.proxyID); |
| newClientAuth.fillPreviousCQAuth(this.clientUserAuths); |
| this.clientUserAuths = newClientAuth; |
| } |
| } |
| } |
| |
| public void setPostAuthzCallback(AccessControl authzCallback) { |
| //TODO:hitesh synchronization |
| synchronized (this.clientUserAuthsLock) { |
| if (this.postAuthzCallback != null) |
| this.postAuthzCallback.close(); |
| this.postAuthzCallback = authzCallback; |
| } |
| } |
| |
| public void setCQVsUserAuth(String cqName, long uniqueId, boolean isDurable) |
| { |
| if(postAuthzCallback == null) //only for multiuser |
| { |
| if(this.clientUserAuths != null) |
| this.clientUserAuths.setUserAuthAttributesForCq(cqName, uniqueId, isDurable); |
| } |
| } |
| |
| private void initializeTransientFields(Socket socket, |
| ClientProxyMembershipID pid, boolean ip, byte cc, Version vers) { |
| this._socket = socket; |
| this.proxyID = pid; |
| this.connected = true; |
| { |
| int bufSize = 1024; |
| try { |
| bufSize = _socket.getSendBufferSize(); |
| if (bufSize < 1024) { |
| bufSize = 1024; |
| } |
| } catch (SocketException ignore) { |
| } |
| this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket); |
| } |
| this._remoteHostAddress = socket.getInetAddress().getHostAddress(); |
| this.isPrimary = ip; |
| this.clientConflation = cc; |
| this.clientVersion = vers; |
| } |
| |
| public boolean isMarkerEnqueued() { |
| return markerEnqueued; |
| } |
| |
| public void setMarkerEnqueued(boolean bool) { |
| markerEnqueued = bool; |
| } |
| |
| public long getAcceptorId(){ |
| return this._acceptorId; |
| } |
| |
| /** |
| * @return the notifyBySubscription |
| */ |
| public boolean isNotifyBySubscription() { |
| return this.notifyBySubscription; |
| } |
| |
| |
| /** |
| * Returns the DistributedMember represented by this proxy |
| */ |
| public ClientProxyMembershipID getProxyID() |
| { |
| return this.proxyID; |
| } |
| |
| // the following code was commented out simply because it was not used |
| // /** |
| // * Determines if the proxy represents the client host (and only the host, not |
| // * necessarily the exact VM running on the host) |
| // * |
| // * @return Whether the proxy represents the client host |
| // */ |
| // protected boolean representsClientHost(String clientHost) |
| // { |
| // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings |
| // return this._remoteHostAddress.equals(clientHost); |
| // } |
| |
| // protected boolean representsClientVM(DistributedMember remoteMember) |
| // { |
| // // logger.warn("Is input port " + clientPort + " contained in " + |
| // // logger.warn("Does input host " + clientHost + " equal " + |
| // // this._remoteHostAddress+ ": " + representsClientHost(clientHost)); |
| // // logger.warn("representsClientVM: " + |
| // // (representsClientHost(clientHost) && containsPort(clientPort))); |
| // return (proxyID.getDistributedMember().equals(remoteMember)); |
| // } |
| |
| // /** |
| // * Determines if the CacheClientUpdater proxied by this instance is listening |
| // * on the input clientHost and clientPort |
| // * |
| // * @param clientHost |
| // * The host name of the client to compare |
| // * @param clientPort |
| // * The port number of the client to compare |
| // * |
| // * @return Whether the CacheClientUpdater proxied by this instance is |
| // * listening on the input clientHost and clientPort |
| // */ |
| // protected boolean representsCacheClientUpdater(String clientHost, |
| // int clientPort) |
| // { |
| // return (clientPort == this._socket.getPort() && representsClientHost(clientHost)); |
| // } |
| |
| protected boolean isMember(ClientProxyMembershipID memberId) |
| { |
| return this.proxyID.equals(memberId); |
| } |
| |
| protected boolean isSameDSMember(ClientProxyMembershipID memberId) |
| { |
| return this.proxyID.isSameDSMember(memberId); |
| } |
| |
| /** |
| * Set the queue keepalive option |
| * |
| * @param option whether to keep the durable client's queue alive |
| */ |
| protected void setKeepAlive(boolean option) { |
| this.keepalive = option; |
| } |
| |
| /** |
| * Returns the socket between the server and the client |
| * |
| * @return the socket between the server and the client |
| */ |
| protected Socket getSocket() |
| { |
| return this._socket; |
| } |
| |
| public String getSocketHost() |
| { |
| return this._socket.getInetAddress().getHostAddress(); |
| } |
| |
| protected ByteBuffer getCommBuffer() { |
| return this._commBuffer; |
| } |
| |
| /** |
| * Returns the remote host's IP address string |
| * |
| * @return the remote host's IP address string |
| */ |
| protected String getRemoteHostAddress() |
| { |
| return this._remoteHostAddress; |
| } |
| |
| /** |
| * Returns the remote host's port |
| * |
| * @return the remote host's port |
| */ |
| public int getRemotePort() |
| { |
| return this._socket.getPort(); |
| } |
| |
| /** |
| * Returns whether the proxy is connected to a remote client |
| * |
| * @return whether the proxy is connected to a remote client |
| */ |
| public boolean isConnected() { |
| return this.connected; |
| } |
| |
| /** |
| * Mark the receiver as needing removal |
| * @return true if it was already marked for removal |
| */ |
| protected boolean startRemoval() { |
| boolean result; |
| synchronized (this.isMarkedForRemovalLock) { |
| result = this.isMarkedForRemoval; |
| this.isMarkedForRemoval = true; |
| } |
| return result; |
| } |
| |
| /** |
| * Wait until the receiver's removal has completed before |
| * returning. |
| * @return true if the proxy was initially marked for removal |
| */ |
| protected boolean waitRemoval() { |
| boolean result; |
| synchronized (this.isMarkedForRemovalLock) { |
| result = this.isMarkedForRemoval; |
| boolean interrupted = false; |
| try { |
| while (this.isMarkedForRemoval) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for CacheClientProxy removal: {}", this); |
| } |
| try { |
| this.isMarkedForRemovalLock.wait(); |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| this._cache.getCancelCriterion().checkCancelInProgress(e); |
| } |
| } // while |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // synchronized |
| return result; |
| } |
| |
| /** |
| * Indicate that removal has completed on this instance |
| */ |
| protected void notifyRemoval() { |
| synchronized (this.isMarkedForRemovalLock) { |
| this.isMarkedForRemoval = false; |
| this.isMarkedForRemovalLock.notifyAll(); |
| } |
| } |
| |
| /** |
| * Returns the GemFire cache |
| * |
| * @return the GemFire cache |
| */ |
| public GemFireCacheImpl getCache() |
| { |
| return this._cache; |
| } |
| |
| public Set<String> getInterestRegisteredRegions() { |
| HashSet<String> regions = new HashSet<String>(); |
| for(int i=0; i < this.cils.length; i++){ |
| if (!this.cils[i].regions.isEmpty()) { |
| regions.addAll(this.cils[i].regions); |
| } |
| } |
| return regions; |
| } |
| |
| /** |
| * Returns the proxy's statistics |
| * |
| * @return the proxy's statistics |
| */ |
| public CacheClientProxyStats getStatistics() |
| { |
| return this._statistics; |
| } |
| |
| /** |
| * Returns this proxy's <code>CacheClientNotifier</code>. |
| * @return this proxy's <code>CacheClientNotifier</code> |
| */ |
| protected CacheClientNotifier getCacheClientNotifier() { |
| return this._cacheClientNotifier; |
| } |
| |
| /** |
| * Returns the size of the queue for heuristic purposes. This |
| * size may be changing concurrently if puts/gets are occurring |
| * at the same time. |
| */ |
| public int getQueueSize() { |
| return this._messageDispatcher == null ? 0 |
| : this._messageDispatcher.getQueueSize(); |
| } |
| |
| /** |
| * returns the queue size calculated through stats |
| */ |
| public int getQueueSizeStat() { |
| return this._messageDispatcher == null ? 0 |
| : this._messageDispatcher.getQueueSizeStat(); |
| } |
| |
| |
| public boolean drainInProgress() { |
| synchronized(drainsInProgressLock) { |
| return numDrainsInProgress > 0; |
| } |
| } |
| |
| //Called from CacheClientNotifier when attempting to restart paused proxy |
| //locking the drain lock requires that no drains are in progress |
| //when the lock was acquired. |
| public boolean lockDrain() { |
| synchronized(drainsInProgressLock) { |
| if (!drainInProgress()) { |
| synchronized(drainLock) { |
| if (testHook != null) { |
| testHook.doTestHook("PRE_ACQUIRE_DRAIN_LOCK_UNDER_SYNC"); |
| } |
| //prevent multiple lockings of drain lock |
| if (!drainLocked) { |
| drainLocked = true; |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| //Called from CacheClientNotifier when completed restart of proxy |
| public void unlockDrain() { |
| if (testHook != null) { |
| testHook.doTestHook("PRE_RELEASE_DRAIN_LOCK"); |
| } |
| synchronized(drainLock) { |
| drainLocked = false; |
| } |
| } |
| |
| //Only close the client cq if it is paused and no one is attempting to restart the proxy |
| public boolean closeClientCq(String clientCQName) throws CqException { |
| if (testHook != null) { |
| testHook.doTestHook("PRE_DRAIN_IN_PROGRESS"); |
| } |
| synchronized(drainsInProgressLock) { |
| numDrainsInProgress ++; |
| } |
| if (testHook != null) { |
| testHook.doTestHook("DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK"); |
| } |
| try { |
| //If the drain lock was acquired, the other thread did so before we could bump up |
| //the numDrainsInProgress. That means we need to stop. |
| if (drainLocked) { |
| // someone is trying to restart a paused proxy |
| String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId()); |
| logger.info(msg); |
| throw new CqException(msg); |
| } |
| //isConnected is to protect against the case where a durable client has reconnected |
| //but has not yet sent a ready for events message |
| //we can probably remove the isPaused check |
| if (isPaused() && !isConnected()) { |
| CqService cqService = getCache().getCqService(); |
| if (cqService != null) { |
| InternalCqQuery cqToClose = cqService.getCq(cqService.constructServerCqName( |
| clientCQName, this.proxyID)); |
| // close and drain |
| if (cqToClose != null) { |
| cqService.closeCq(clientCQName, this.proxyID); |
| this._messageDispatcher.drainClientCqEvents(this.proxyID, cqToClose); |
| } |
| else { |
| String msg = LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0.toLocalizedString(clientCQName); |
| logger.info(msg); |
| throw new CqException(msg); |
| } |
| } |
| } else { |
| String msg = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_ACTIVE_DURABLE_CLIENT.toLocalizedString(clientCQName, proxyID.getDurableId()); |
| logger.info(msg); |
| throw new CqException(msg); |
| } |
| } finally { |
| synchronized (drainsInProgressLock) { |
| numDrainsInProgress--; |
| } |
| if (testHook != null) { |
| testHook.doTestHook("DRAIN_COMPLETE"); |
| } |
| |
| } |
| return true; |
| } |
| |
| |
| /** |
| * Returns whether the proxy is alive. It is alive if its message dispatcher |
| * is processing messages. |
| * |
| * @return whether the proxy is alive |
| */ |
| protected boolean isAlive() |
| { |
| if (this._messageDispatcher == null) { |
| return false; |
| } |
| return !this._messageDispatcher.isStopped(); |
| } |
| |
| /** |
| * Returns whether the proxy is paused. It is paused if its message dispatcher |
| * is paused. This only applies to durable clients. |
| * |
| * @return whether the proxy is paused |
| * |
| * @since 5.5 |
| */ |
| protected boolean isPaused() { |
| return this._isPaused; |
| } |
| |
| protected void setPaused(boolean isPaused) { |
| this._isPaused = isPaused; |
| } |
| |
| /** |
| * Closes the proxy. This method checks the message queue for any unprocessed |
| * messages and processes them for MAXIMUM_SHUTDOWN_PEEKS. |
| * |
| * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS |
| */ |
| protected void close() |
| { |
| close(true, false); |
| } |
| |
| /** |
| * Set to true once this proxy starts being closed. |
| * Remains true for the rest of its existence. |
| */ |
| private final AtomicBoolean closing = new AtomicBoolean(false); |
| |
| /** |
| * Close the <code>CacheClientProxy</code>. |
| * |
| * @param checkQueue |
| * Whether to message check the queue and process any contained |
| * messages (up to MAXIMUM_SHUTDOWN_PEEKS). |
| * @param stoppedNormally |
| * Whether client stopped normally |
| * |
| * @return whether to keep this <code>CacheClientProxy</code> |
| * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS |
| */ |
| protected boolean close(boolean checkQueue, boolean stoppedNormally) { |
| boolean pauseDurable = false; |
| // If the client is durable and either (a) it hasn't stopped normally or (b) it |
| // has stopped normally but it is configured to be kept alive, set pauseDurable |
| // to true |
| if (isDurable() |
| && (!stoppedNormally || (getDurableKeepAlive() && stoppedNormally))) { |
| pauseDurable = true; |
| } |
| |
| boolean keepProxy = false; |
| if (pauseDurable) { |
| pauseDispatching(); |
| keepProxy = true; |
| } else { |
| terminateDispatching(checkQueue); |
| closeTransientFields(); |
| } |
| |
| this.connected = false; |
| |
| // Close the Authorization callback (if any) |
| try { |
| if (!pauseDurable) { |
| if (this.postAuthzCallback != null) {//for single user |
| this.postAuthzCallback.close(); |
| this.postAuthzCallback = null; |
| }else if(this.clientUserAuths != null) {//for multiple users |
| this.clientUserAuths.cleanup(true); |
| this.clientUserAuths = null; |
| } |
| } |
| } |
| catch (Exception ex) { |
| if (this._cache.getSecurityLoggerI18n().warningEnabled()) { |
| this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON, new Object[] {this, ex}); |
| } |
| } |
| // Notify the caller whether to keep this proxy. If the proxy is durable |
| // and should be paused, then return true; otherwise return false. |
| return keepProxy; |
| } |
| |
| protected void pauseDispatching() { |
| if (this._messageDispatcher == null){ |
| return; |
| } |
| |
| // If this is the primary, pause the dispatcher (which closes its transient |
| // fields. Otherwise, just close the transient fields. |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Pausing processing", this); |
| } |
| //BUGFIX for BUG#38234 |
| if(!testAndSetPaused(true) && this.isPrimary) { |
| if (this._messageDispatcher != Thread.currentThread()) { |
| // don't interrupt ourself to fix bug 40611 |
| this._messageDispatcher.interrupt(); |
| } |
| } |
| |
| try { |
| // Close transient fields |
| closeTransientFields(); |
| } finally { |
| // make sure this gets called if closeTransientFields throws; see bug 40611 |
| // Start timer |
| scheduleDurableExpirationTask(); |
| } |
| } |
| |
| private boolean testAndSetPaused(boolean newValue) { |
| |
| synchronized(this._messageDispatcher._pausedLock) { |
| if (this._isPaused != newValue) { |
| this._isPaused = newValue; |
| this._messageDispatcher._pausedLock.notifyAll(); |
| return !this._isPaused; |
| } |
| else { |
| this._messageDispatcher._pausedLock.notifyAll(); |
| return this._isPaused; |
| } |
| } |
| } |
| protected void terminateDispatching(boolean checkQueue) { |
| if (this._messageDispatcher == null){ |
| return; |
| } |
| |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Terminating processing", this); |
| } |
| if (this._messageDispatcher == Thread.currentThread()) { |
| // I'm not even sure this is possible but if the dispatcher |
| // calls us then at least call stopDispatching |
| // the old code did this (I'm not even sure it is safe to do). |
| // This needs to be done without testing OR setting "closing". |
| this._messageDispatcher.stopDispatching(checkQueue); |
| this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList(); |
| this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList(); |
| // VJR: bug 37487 fix |
| destroyRQ(); |
| return; |
| } |
| |
| if (!this.closing.compareAndSet(false, true)) { |
| // must already be closing so just return |
| // this is part of the fix for 37684 |
| return; |
| } |
| // Unregister interest in all interests (if necessary) |
| this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList(); |
| this.cils[RegisterInterestTracker.durableInterestListIndex].clearClientInterestList(); |
| |
| // If the message dispatcher is paused, unpause it. The next bit of |
| // code will interrupt the waiter. |
| if (this.testAndSetPaused(false)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Paused but terminating processing", this); |
| } |
| // Cancel the expiration task |
| cancelDurableExpirationTask(false); |
| } |
| |
| boolean alreadyDestroyed = false; |
| boolean gotInterrupt = Thread.interrupted(); // clears the flag |
| try { |
| // Stop the message dispatcher |
| this._messageDispatcher.stopDispatching(checkQueue); |
| |
| gotInterrupt |= Thread.interrupted(); // clears the flag |
| |
| // to fix bug 37684 |
| // 1. check to see if dispatcher is still alive |
| if (this._messageDispatcher.isAlive()) { |
| if (this._socket != null && !this._socket.isClosed()) { |
| SocketCreator.asyncClose(this._socket, this._remoteHostAddress, null); |
| getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections(); |
| } |
| destroyRQ(); |
| alreadyDestroyed = true; |
| this._messageDispatcher.interrupt(); |
| if (this._messageDispatcher.isAlive()) { |
| try { |
| this._messageDispatcher.join(1000); |
| } catch (InterruptedException ex) { |
| gotInterrupt = true; |
| } |
| // if it is still alive then warn and move on |
| if (this._messageDispatcher.isAlive()) { |
| //com.gemstone.gemfire.internal.OSProcess.printStacks(com.gemstone.gemfire.internal.OSProcess.getId()); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_COULD_NOT_STOP_MESSAGE_DISPATCHER_THREAD, this)); |
| } |
| } |
| } |
| } |
| finally { |
| if (gotInterrupt) { |
| Thread.currentThread().interrupt(); |
| } |
| if (!alreadyDestroyed) { |
| destroyRQ(); |
| } |
| } |
| } finally { |
| // Close the statistics |
| this._statistics.close(); // fix for bug 40105 |
| closeTransientFields(); // make sure this happens |
| } |
| } |
| |
| private void closeTransientFields() { |
| // Close the socket |
| if (this._socket != null && !this._socket.isClosed()) { |
| try { |
| this._socket.close(); |
| getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections(); |
| } catch (IOException e) {/*ignore*/} |
| } |
| |
| // Null out comm buffer, host address, ports and proxy id. All will be |
| // replaced when the client reconnects. |
| releaseCommBuffer(); |
| this._remoteHostAddress = null; |
| try { |
| this.cils[RegisterInterestTracker.interestListIndex].clearClientInterestList(); |
| } catch (CacheClosedException e) { |
| // ignore if cache is shutting down |
| } |
| // Commented to fix bug 40259 |
| //this.clientVersion = null; |
| closeNonDurableCqs(); |
| } |
| |
| private void releaseCommBuffer() { |
| ByteBuffer bb = this._commBuffer; |
| if (bb != null) { |
| this._commBuffer = null; |
| ServerConnection.releaseCommBuffer(bb); |
| } |
| } |
| |
| private void closeNonDurableCqs(){ |
| CqService cqService = getCache().getCqService(); |
| if (cqService != null) { |
| try { |
| cqService.closeNonDurableClientCqs(getProxyID()); |
| } |
| catch (CqException ex) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_CQEXCEPTION_WHILE_CLOSING_NON_DURABLE_CQS_0, ex.getLocalizedMessage())); |
| } |
| } |
| } |
| |
| private void destroyRQ() { |
| if (this._messageDispatcher == null) { |
| return; |
| } |
| try { |
| // Using Destroy Region bcoz this method is modified in HARegion so as |
| // not to distribute. |
| // For normal Regions , even the localDestroyRegion actually propagates |
| HARegionQueue rq = this._messageDispatcher._messageQueue; |
| rq.destroy(); |
| |
| // if (!rq.getRegion().isDestroyed()) { |
| // rq.getRegion().destroyRegion(); |
| // } |
| } |
| catch (RegionDestroyedException rde) { |
| // throw rde; |
| } |
| catch (CancelException e) { |
| // throw e; |
| } |
| catch (Exception warning) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_EXCEPTION_IN_CLOSING_THE_UNDERLYING_HAREGION_OF_THE_HAREGIONQUEUE, this), warning); |
| } |
| } |
| |
| public void registerInterestRegex(String regionName, String regex, |
| boolean isDurable) { |
| registerInterestRegex(regionName, regex, isDurable, true); |
| } |
| |
| public void registerInterestRegex(String regionName, String regex, |
| boolean isDurable, boolean receiveValues) { |
| if (this.isPrimary) { |
| // Notify all secondaries and client of change in interest |
| notifySecondariesAndClient(regionName, regex, InterestResultPolicy.NONE, |
| isDurable, receiveValues, InterestType.REGULAR_EXPRESSION); |
| } else { |
| throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); |
| } |
| } |
| |
| public void registerInterest(String regionName, Object keyOfInterest, |
| InterestResultPolicy policy, boolean isDurable) { |
| registerInterest(regionName, keyOfInterest, policy, isDurable, true); |
| } |
| |
| public void registerInterest(String regionName, Object keyOfInterest, |
| InterestResultPolicy policy, boolean isDurable, |
| boolean receiveValues) { |
| if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) { |
| registerInterestRegex(regionName, ".*", isDurable, receiveValues); |
| } else if (keyOfInterest instanceof List) { |
| if (this.isPrimary) { |
| notifySecondariesAndClient(regionName, keyOfInterest, policy, |
| isDurable, receiveValues, InterestType.KEY); |
| } else { |
| throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); |
| } |
| } else { |
| if (this.isPrimary) { |
| // Notify all secondaries and client of change in interest |
| notifySecondariesAndClient(regionName, keyOfInterest, policy, |
| isDurable, receiveValues, InterestType.KEY); |
| |
| // Enqueue the initial value message for the client if necessary |
| if (policy == InterestResultPolicy.KEYS_VALUES) { |
| Get70 request = (Get70)Get70.getCommand(); |
| LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName); |
| Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, |
| null); |
| boolean isObject = entry.isObject; |
| byte[] value = null; |
| if (entry.value instanceof byte[]) { |
| value = (byte[])entry.value; |
| } else { |
| try { |
| value = CacheServerHelper.serialize(entry.value); |
| } catch (IOException e) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_THE_FOLLOWING_EXCEPTION_OCCURRED_0, entry.value), e); |
| } |
| } |
| VersionTag tag = entry.versionTag; |
| ClientUpdateMessage updateMessage = new ClientUpdateMessageImpl( |
| EnumListenerEvent.AFTER_CREATE, lr, keyOfInterest, value, null, |
| (isObject ? (byte) 0x01 : (byte) 0x00), null, this.proxyID, |
| new EventID(this._cache.getDistributedSystem()), tag); |
| CacheClientNotifier.routeSingleClientMessage(updateMessage, this.proxyID); |
| } |
| // Add the client to the region's filters |
| //addFilterRegisteredClients(regionName, keyOfInterest); |
| } else { |
| throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); |
| } |
| } |
| } |
| |
| private void notifySecondariesAndClient(String regionName, |
| Object keyOfInterest, InterestResultPolicy policy, boolean isDurable, |
| boolean receiveValues, int interestType) { |
| // Create a client interest message for the keyOfInterest |
| ClientInterestMessageImpl message = new ClientInterestMessageImpl( |
| new EventID(this._cache.getDistributedSystem()), regionName, |
| keyOfInterest, interestType, policy.getOrdinal(), isDurable, |
| !receiveValues, ClientInterestMessageImpl.REGISTER); |
| |
| // Notify all secondary proxies of a change in interest |
| notifySecondariesOfInterestChange(message); |
| |
| // Modify interest registration |
| if (keyOfInterest instanceof List) { |
| registerClientInterestList(regionName, (List) keyOfInterest, isDurable, |
| !receiveValues, true); |
| } else { |
| registerClientInterest(regionName, keyOfInterest, interestType, |
| isDurable, !receiveValues, true); |
| } |
| |
| // Enqueue the interest registration message for the client. |
| // If the client is not 7.0.1 or greater and the key of interest is a list, |
| // then create an individual message for each entry in the list since the |
| // client doesn't support a ClientInterestMessageImpl containing a list. |
| if (Version.GFE_701.compareTo(this.clientVersion) > 0 |
| && keyOfInterest instanceof List) { |
| for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) { |
| this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl( |
| new EventID(this._cache.getDistributedSystem()), regionName, |
| i.next(), interestType, policy.getOrdinal(), isDurable, !receiveValues, |
| ClientInterestMessageImpl.REGISTER)); |
| } |
| } else { |
| this._messageDispatcher.enqueueMessage(message); |
| } |
| } |
| |
| public void unregisterInterestRegex(String regionName, String regex, |
| boolean isDurable) { |
| unregisterInterestRegex(regionName, regex, isDurable, true); |
| } |
| |
| public void unregisterInterestRegex(String regionName, String regex, |
| boolean isDurable, boolean receiveValues) { |
| if (this.isPrimary) { |
| notifySecondariesAndClient(regionName, regex, isDurable, receiveValues, |
| InterestType.REGULAR_EXPRESSION); |
| } else { |
| throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); |
| } |
| } |
| |
| public void unregisterInterest(String regionName, Object keyOfInterest, |
| boolean isDurable) { |
| unregisterInterest(regionName, keyOfInterest, isDurable, true); |
| } |
| |
| public void unregisterInterest(String regionName, Object keyOfInterest, |
| boolean isDurable, boolean receiveValues) { |
| if (keyOfInterest instanceof String && keyOfInterest.equals("ALL_KEYS")) { |
| unregisterInterestRegex(regionName, ".*", isDurable, receiveValues); |
| } else { |
| if (this.isPrimary) { |
| notifySecondariesAndClient(regionName, keyOfInterest, isDurable, |
| receiveValues, InterestType.KEY); |
| } else { |
| throw new IllegalStateException(LocalizedStrings.CacheClientProxy_NOT_PRIMARY.toLocalizedString()); |
| } |
| } |
| } |
| |
| private void notifySecondariesAndClient(String regionName, |
| Object keyOfInterest, boolean isDurable, boolean receiveValues, |
| int interestType) { |
| // Notify all secondary proxies of a change in interest |
| ClientInterestMessageImpl message = new ClientInterestMessageImpl( |
| new EventID(this._cache.getDistributedSystem()), regionName, |
| keyOfInterest, interestType, (byte) 0, isDurable, !receiveValues, |
| ClientInterestMessageImpl.UNREGISTER); |
| notifySecondariesOfInterestChange(message); |
| |
| // Modify interest registration |
| if (keyOfInterest instanceof List) { |
| unregisterClientInterest(regionName, (List) keyOfInterest, false); |
| } else { |
| unregisterClientInterest(regionName, keyOfInterest, interestType, |
| false); |
| } |
| |
| // Enqueue the interest unregistration message for the client. |
| // If the client is not 7.0.1 or greater and the key of interest is a list, |
| // then create an individual message for each entry in the list since the |
| // client doesn't support a ClientInterestMessageImpl containing a list. |
| if (Version.GFE_701.compareTo(this.clientVersion) > 0 |
| && keyOfInterest instanceof List) { |
| for (Iterator i = ((List) keyOfInterest).iterator(); i.hasNext();) { |
| this._messageDispatcher.enqueueMessage(new ClientInterestMessageImpl( |
| new EventID(this._cache.getDistributedSystem()), regionName, |
| i.next(), interestType, (byte) 0, isDurable, !receiveValues, |
| ClientInterestMessageImpl.UNREGISTER)); |
| } |
| } else { |
| this._messageDispatcher.enqueueMessage(message); |
| } |
| } |
| |
| protected void notifySecondariesOfInterestChange(ClientInterestMessageImpl message) { |
| if (logger.isDebugEnabled()) { |
| StringBuffer subBuffer = new StringBuffer(); |
| if (message.isRegister()) { |
| subBuffer |
| .append("register ") |
| .append(message.getIsDurable() ? "" : "non-") |
| .append("durable interest in "); |
| } else { |
| subBuffer.append("unregister interest in "); |
| } |
| StringBuffer buffer = new StringBuffer(); |
| buffer |
| .append(this) |
| .append(": Notifying secondary proxies to ") |
| .append(subBuffer.toString()) |
| .append(message.getRegionName()) |
| .append("->") |
| .append(message.getKeyOfInterest()) |
| .append("->") |
| .append(InterestType.getString(message.getInterestType())); |
| logger.debug(buffer.toString()); |
| } |
| this._cacheClientNotifier.deliverInterestChange(this.proxyID, message); |
| } |
| |
| /* |
| protected void addFilterRegisteredClients(String regionName, |
| Object keyOfInterest) { |
| try { |
| this._cacheClientNotifier.addFilterRegisteredClients(regionName, |
| this.proxyID); |
| } catch (RegionDestroyedException e) { |
| logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" + keyOfInterest, e); |
| } |
| } |
| */ |
| |
| /** |
| * Registers interest in the input region name and key |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to register |
| * interest |
| * @param keyOfInterest |
| * The key in which to register interest |
| */ |
| protected void registerClientInterest(String regionName, |
| Object keyOfInterest, int interestType, boolean isDurable, |
| boolean sendUpdatesAsInvalidates, boolean flushState) |
| { |
| ClientInterestList cil = |
| this.cils[RegisterInterestTracker.getInterestLookupIndex( |
| isDurable, false)]; |
| cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates); |
| if (flushState) { |
| flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember()); |
| } |
| HARegionQueue queue = getHARegionQueue(); |
| if (queue != null) { // queue is null during initialization |
| queue.setHasRegisteredInterest(true); |
| } |
| } |
| |
| /** |
| * flush other regions to the given target. This is usually the member |
| * that is registering the interest. During queue creation it is the |
| * queue's image provider. |
| */ |
| public void flushForInterestRegistration(String regionName, DistributedMember target) { |
| Region r = this._cache.getRegion(regionName); |
| if (r == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Unable to find region '{}' to flush for interest registration", regionName); |
| } |
| } else if (r.getAttributes().getScope().isDistributed()) { |
| if (logger.isDebugEnabled()){ |
| logger.debug("Flushing region '{}' for interest registration", regionName); |
| } |
| CacheDistributionAdvisee cd = (CacheDistributionAdvisee)r; |
| final StateFlushOperation sfo; |
| if (r instanceof PartitionedRegion) { |
| // need to flush all buckets. SFO should be changed to target buckets |
| // belonging to a particular PR, but it doesn't have that option right now |
| sfo = new StateFlushOperation( |
| this._cache.getDistributedSystem().getDistributionManager()); |
| } else { |
| sfo = new StateFlushOperation((DistributedRegion)r); |
| } |
| try { |
| // bug 41681 - we need to flush any member that may have a cache operation |
| // in progress so that the changes are received there before returning |
| // from this method |
| InitialImageAdvice advice = cd.getCacheDistributionAdvisor().adviseInitialImage(null); |
| HashSet recips = new HashSet(advice.getReplicates()); |
| recips.addAll(advice.getUninitialized()); |
| recips.addAll(advice.getEmpties()); |
| recips.addAll(advice.getPreloaded()); |
| recips.addAll(advice.getOthers()); |
| sfo.flush(recips, |
| target, |
| DistributionManager.HIGH_PRIORITY_EXECUTOR, true); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| } |
| |
| /** |
| * Unregisters interest in the input region name and key |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to unregister |
| * interest |
| * @param keyOfInterest |
| * The key in which to unregister interest |
| * @param isClosing |
| * Whether the caller is closing |
| */ |
| protected void unregisterClientInterest(String regionName, |
| Object keyOfInterest, int interestType, boolean isClosing) |
| { |
| // only unregister durable interest if isClosing and !keepalive |
| if (!isClosing /* explicit unregister */ |
| || !getDurableKeepAlive() /* close and no keepAlive*/) { |
| this.cils[RegisterInterestTracker.durableInterestListIndex]. |
| unregisterClientInterest(regionName, keyOfInterest, interestType); |
| } |
| // always unregister non durable interest |
| this.cils[RegisterInterestTracker.interestListIndex]. |
| unregisterClientInterest(regionName, keyOfInterest, interestType); |
| } |
| |
| /** |
| * Registers interest in the input region name and list of keys |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to register |
| * interest |
| * @param keysOfInterest |
| * The list of keys in which to register interest |
| */ |
| protected void registerClientInterestList(String regionName, |
| List keysOfInterest, boolean isDurable, boolean sendUpdatesAsInvalidates, |
| boolean flushState) |
| { |
| // we only use two interest lists to map the non-durable and durable |
| // identifiers to their interest settings |
| ClientInterestList cil = |
| this.cils[RegisterInterestTracker.getInterestLookupIndex( |
| isDurable, false/*sendUpdatesAsInvalidates*/)]; |
| cil.registerClientInterestList(regionName, keysOfInterest, sendUpdatesAsInvalidates); |
| if (getHARegionQueue() != null) { |
| if (flushState) { |
| flushForInterestRegistration(regionName, this._cache.getDistributedSystem().getDistributedMember()); |
| } |
| getHARegionQueue().setHasRegisteredInterest(true); |
| } |
| } |
| |
| /** |
| * Unregisters interest in the input region name and list of keys |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to unregister |
| * interest |
| * @param keysOfInterest |
| * The list of keys in which to unregister interest |
| * @param isClosing |
| * Whether the caller is closing |
| */ |
| protected void unregisterClientInterest(String regionName, |
| List keysOfInterest, boolean isClosing) |
| { |
| // only unregister durable interest if isClosing and !keepalive |
| if (!isClosing /* explicit unregister */ |
| || !getDurableKeepAlive() /* close and no keepAlive*/) { |
| this.cils[RegisterInterestTracker.durableInterestListIndex]. |
| unregisterClientInterestList(regionName, keysOfInterest); |
| } |
| // always unregister non durable interest |
| this.cils[RegisterInterestTracker.interestListIndex]. |
| unregisterClientInterestList(regionName, keysOfInterest); |
| } |
| |
| |
| /** sent by the cache client notifier when there is an interest registration change */ |
| protected void processInterestMessage(ClientInterestMessageImpl message) { |
| int interestType = message.getInterestType(); |
| String regionName = message.getRegionName(); |
| Object key = message.getKeyOfInterest(); |
| if (message.isRegister()) { |
| // Register interest in this region->key |
| if (key instanceof List) { |
| registerClientInterestList(regionName, (List) key, |
| message.getIsDurable(), message.getForUpdatesAsInvalidates(), true); |
| } else { |
| registerClientInterest(regionName, key, interestType, |
| message.getIsDurable(), message.getForUpdatesAsInvalidates(), true); |
| } |
| |
| // Add the client to the region's filters |
| //addFilterRegisteredClients(regionName, key); |
| |
| if (logger.isDebugEnabled()) { |
| StringBuffer buffer = new StringBuffer(); |
| buffer |
| .append(this) |
| .append(": Interest listener registered ") |
| .append(message.getIsDurable() ? "" : "non-") |
| .append("durable interest in ") |
| .append(message.getRegionName()) |
| .append("->") |
| .append(message.getKeyOfInterest()) |
| .append("->") |
| .append(InterestType.getString(message.getInterestType())); |
| logger.debug(buffer.toString()); |
| } |
| } else { |
| // Unregister interest in this region->key |
| if (key instanceof List) { |
| unregisterClientInterest(regionName, (List) key, false); |
| } else { |
| unregisterClientInterest(regionName, key, interestType, false); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| StringBuffer buffer = new StringBuffer(); |
| buffer |
| .append(this) |
| .append(": Interest listener unregistered interest in ") |
| .append(message.getRegionName()) |
| .append("->") |
| .append(message.getKeyOfInterest()) |
| .append("->") |
| .append(InterestType.getString(message.getInterestType())); |
| logger.debug(buffer.toString()); |
| } |
| } |
| } |
| |
| private boolean postDeliverAuthCheckPassed(ClientUpdateMessage clientMessage) { |
| // Before adding it in the queue for dispatching, check for post |
| // process authorization |
| if (AcceptorImpl.isAuthenticationRequired() |
| && this.postAuthzCallback == null |
| && AcceptorImpl.isPostAuthzCallbackPresent()) { |
| // security is on and callback is null: it means multiuser mode. |
| ClientUpdateMessageImpl cumi = (ClientUpdateMessageImpl)clientMessage; |
| |
| CqNameToOp clientCq = cumi.getClientCq(this.proxyID); |
| |
| if (clientCq != null && !clientCq.isEmpty()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CCP clientCq size before processing auth {}", clientCq.size()); |
| } |
| String[] regionNameHolder = new String[1]; |
| OperationContext opctxt = getOperationContext(clientMessage, |
| regionNameHolder); |
| if (opctxt == null) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE, |
| new Object[] {this, clientMessage})); |
| return false; |
| } |
| |
| String[] cqNames = clientCq.getNames(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("CCP clientCq names array size {}", cqNames.length); |
| } |
| for (int i = 0; i < cqNames.length; i++) { |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CCP clientCq name {}", cqNames[i]); |
| } |
| boolean isAuthorized = false; |
| |
| if (this.proxyID.isDurable() && this.getDurableKeepAlive() |
| && this._isPaused) { |
| // need to take lock as we may be reinitializing proxy cache |
| synchronized (this.clientUserAuthsLock) { |
| AuthorizeRequestPP postAuthCallback = this.clientUserAuths |
| .getUserAuthAttributes(cqNames[i]).getPostAuthzRequest(); |
| if (logger.isDebugEnabled() && postAuthCallback == null) { |
| logger.debug("CCP clientCq post callback is null"); |
| } |
| if (postAuthCallback != null && postAuthCallback |
| .getPostAuthzCallback().authorizeOperation( |
| regionNameHolder[0], opctxt)) { |
| isAuthorized = true; |
| } |
| } |
| } else { |
| UserAuthAttributes userAuthAttributes = this.clientUserAuths |
| .getUserAuthAttributes(cqNames[i]); |
| |
| AuthorizeRequestPP postAuthCallback = userAuthAttributes |
| .getPostAuthzRequest(); |
| if (postAuthCallback == null && logger.isDebugEnabled()) { |
| logger.debug("CCP clientCq post callback is null"); |
| } |
| if (postAuthCallback != null && postAuthCallback |
| .getPostAuthzCallback().authorizeOperation( |
| regionNameHolder[0], opctxt)) { |
| isAuthorized = true; |
| } |
| } |
| |
| if (!isAuthorized) { |
| logger.warn(LocalizedMessage.create( |
| LocalizedStrings.CacheClientProxy__0_NOT_ADDING_CQ_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED, |
| new Object[] {this, clientMessage})); |
| clientCq.delete(cqNames[i]); |
| } |
| } catch (Exception ex) { |
| // ignore... |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("CCP clientCq size after processing auth {}", clientCq.size()); |
| } |
| } |
| // again need to check as there may be no CQ available |
| if (!clientMessage.hasCqs(this.proxyID)) { |
| this._statistics.incMessagesNotQueuedNotInterested(); |
| if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) { |
| logger.debug("{}: Not adding message to queue. It is not interested in this region and key: {}", clientMessage); |
| } |
| return false; |
| } |
| } |
| } |
| else if (this.postAuthzCallback != null) { |
| String[] regionNameHolder = new String[1]; |
| boolean isAuthorize = false; |
| OperationContext opctxt = getOperationContext(clientMessage, |
| regionNameHolder); |
| if (opctxt == null) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_THE_OPERATION_CONTEXT_OBJECT_COULD_NOT_BE_OBTAINED_FOR_THIS_CLIENT_MESSAGE, new Object[] {this, clientMessage})); |
| return false; |
| } |
| if (logger.isTraceEnabled()){ |
| logger.trace("{}: Invoking authorizeOperation for message: {}", this, clientMessage); |
| } |
| |
| if (this.proxyID.isDurable() && this.getDurableKeepAlive() |
| && this._isPaused) { |
| synchronized (this.clientUserAuthsLock) { |
| isAuthorize = this.postAuthzCallback.authorizeOperation( |
| regionNameHolder[0], opctxt); |
| } |
| } else { |
| isAuthorize = this.postAuthzCallback.authorizeOperation( |
| regionNameHolder[0], opctxt); |
| } |
| if (!isAuthorize) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy__0_NOT_ADDING_MESSAGE_TO_QUEUE_1_BECAUSE_AUTHORIZATION_FAILED, new Object[] {this, clientMessage})); |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Delivers the message to the client representing this client proxy. |
| * @param conflatable |
| */ |
| protected void deliverMessage(Conflatable conflatable) |
| { |
| ClientUpdateMessage clientMessage = null; |
| if(conflatable instanceof HAEventWrapper) { |
| clientMessage = ((HAEventWrapper)conflatable).getClientUpdateMessage(); |
| } else { |
| clientMessage = (ClientUpdateMessage)conflatable; |
| } |
| |
| this._statistics.incMessagesReceived(); |
| |
| if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) { |
| // If dispatcher is getting initialized, add the event to temporary queue. |
| if (this.messageDispatcherInit) { |
| synchronized (this.queuedEventsSync) { |
| if (this.messageDispatcherInit) { // Check to see value did not changed while getting the synchronize lock. |
| if (logger.isDebugEnabled()) { |
| logger.debug("Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.", this); |
| } |
| this.queuedEvents.add(conflatable); |
| return; |
| } |
| } |
| } |
| |
| if (this._messageDispatcher != null) { |
| this._messageDispatcher.enqueueMessage(conflatable); |
| } else { |
| this._statistics.incMessagesFailedQueued(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Message is not added to the queue. Message dispatcher for proxy: {} doesn't exist.", this); |
| } |
| } |
| } else { |
| this._statistics.incMessagesFailedQueued(); |
| } |
| } |
| |
| protected void sendMessageDirectly(ClientMessage message) { |
| // Send the message directly if the connection exists |
| // (do not go through the queue). |
| if (logger.isDebugEnabled()){ |
| logger.debug("About to send message directly to {}", this); |
| } |
| if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) { |
| // If the socket is open, send the message to it |
| this._messageDispatcher.sendMessageDirectly(message); |
| if (logger.isDebugEnabled()){ |
| logger.debug("Sent message directly to {}", this); |
| } |
| } else { |
| // Otherwise just reset the ping counter |
| resetPingCounter(); |
| if (logger.isDebugEnabled()){ |
| logger.debug("Skipped sending message directly to {}", this); |
| } |
| } |
| } |
| |
| private OperationContext getOperationContext(ClientMessage cmsg, |
| String[] regionNameHolder) { |
| ClientUpdateMessageImpl cmsgimpl = (ClientUpdateMessageImpl)cmsg; |
| OperationContext opctxt = null; |
| // TODO SW: Special handling for DynamicRegions; this should be reworked |
| // when DynamicRegion API is deprecated |
| String regionName = cmsgimpl.getRegionName(); |
| regionNameHolder[0] = regionName; |
| if (cmsgimpl.isCreate()) { |
| if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { |
| regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest(); |
| opctxt = new RegionCreateOperationContext(true); |
| } |
| else { |
| PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl |
| .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.CREATE, |
| true); |
| tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); |
| opctxt = tmp; |
| } |
| } |
| else if (cmsgimpl.isUpdate()) { |
| if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { |
| regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest(); |
| opctxt = new RegionCreateOperationContext(true); |
| } |
| else { |
| PutOperationContext tmp = new PutOperationContext(cmsgimpl.getKeyOfInterest(), cmsgimpl |
| .getValue(), cmsgimpl.valueIsObject(), PutOperationContext.UPDATE, |
| true); |
| tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); |
| opctxt = tmp; |
| } |
| } |
| else if (cmsgimpl.isDestroy()) { |
| if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { |
| regionNameHolder[0] = (String)cmsgimpl.getKeyOfInterest(); |
| opctxt = new RegionDestroyOperationContext(true); |
| } |
| else { |
| DestroyOperationContext tmp = new DestroyOperationContext(cmsgimpl.getKeyOfInterest(), true); |
| tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); |
| opctxt = tmp; |
| } |
| } |
| else if (cmsgimpl.isDestroyRegion()) { |
| opctxt = new RegionDestroyOperationContext(true); |
| } |
| else if (cmsgimpl.isInvalidate()) { |
| InvalidateOperationContext tmp = new InvalidateOperationContext(cmsgimpl.getKeyOfInterest(), true); |
| tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); |
| opctxt = tmp; |
| } |
| else if (cmsgimpl.isClearRegion()) { |
| RegionClearOperationContext tmp = new RegionClearOperationContext(true); |
| tmp.setCallbackArg(cmsgimpl.getCallbackArgument()); |
| opctxt = tmp; |
| } |
| return opctxt; |
| } |
| |
| /** |
| * Initializes the message dispatcher thread. The |
| * <code>MessageDispatcher</code> processes the message queue. |
| * |
| * @throws CacheException |
| */ |
| public void initializeMessageDispatcher() throws CacheException |
| { |
| this.messageDispatcherInit = true; // Initialization process. |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Initializing message dispatcher with capacity of {} entries", this, _maximumMessageCount); |
| } |
| String name = "Client Message Dispatcher for " |
| + getProxyID().getDistributedMember() + (isDurable()? " (" + getDurableId()+")" : ""); |
| this._messageDispatcher = new MessageDispatcher(this, name); |
| |
| //Fix for 41375 - drain as many of the queued events |
| //as we can without synchronization. |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} draining {} events from init queue into intialized queue", this, this.queuedEvents.size()); |
| } |
| Conflatable nextEvent; |
| while((nextEvent = queuedEvents.poll()) != null) { |
| this._messageDispatcher.enqueueMessage(nextEvent); |
| } |
| |
| //Now finish emptying the queue with synchronization to make |
| //sure we don't miss any events. |
| synchronized (this.queuedEventsSync){ |
| while((nextEvent = queuedEvents.poll()) != null) { |
| this._messageDispatcher.enqueueMessage(nextEvent); |
| } |
| |
| this.messageDispatcherInit = false; // Done initialization. |
| } |
| } finally { |
| if (this.messageDispatcherInit) { // If its not successfully completed. |
| this._statistics.close(); |
| } |
| } |
| } |
| |
| protected void startOrResumeMessageDispatcher(boolean processedMarker) { |
| // Only start or resume the dispatcher if it is Primary |
| if (this.isPrimary) { |
| // Add the marker to the queue |
| if (!processedMarker) { |
| EventID eventId = new EventID(this._cache.getDistributedSystem()); |
| this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId)); |
| } |
| |
| // Set the message queue to primary. |
| this._messageDispatcher._messageQueue.setPrimary(true); |
| |
| // Start or resume the dispatcher |
| synchronized (this._messageDispatcher._pausedLock) { |
| if (this.isPaused()) { |
| // It is paused, resume it |
| this.setPaused(false); |
| if (this._messageDispatcher.isStopped()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Starting dispatcher", this); |
| } |
| this._messageDispatcher.start(); |
| } |
| else { |
| // ARB: Initialize transient fields. |
| this._messageDispatcher.initializeTransients(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Resuming dispatcher", this); |
| } |
| this._messageDispatcher.resumeDispatching(); |
| } |
| } else if (!this._messageDispatcher.isAlive()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Starting dispatcher", this); |
| } |
| this._messageDispatcher.start(); |
| } |
| } |
| } |
| } |
| |
| /* |
| * Returns whether the client represented by this <code> CacheClientProxy |
| * </code> has registered interest in anything. @return whether the client |
| * represented by this <code> CacheClientProxy </code> has registered interest |
| * in anything |
| */ |
| protected boolean hasRegisteredInterested() |
| { |
| return |
| this.cils[RegisterInterestTracker.interestListIndex].hasInterest() || |
| this.cils[RegisterInterestTracker.durableInterestListIndex].hasInterest(); |
| } |
| |
| /** |
| * Returns a string representation of the proxy |
| */ |
| @Override |
| public String toString() |
| { |
| StringBuffer buffer = new StringBuffer(); |
| buffer.append("CacheClientProxy[") |
| // .append("client proxy id=") |
| .append(this.proxyID) |
| // .append("; client host name=") |
| // .append(this._socket.getInetAddress().getCanonicalHostName()) |
| // .append("; client host address=") |
| // .append(this._remoteHostAddress) |
| .append("; port=").append(this._socket.getPort()) |
| .append("; primary=").append(isPrimary) |
| .append("; version=").append(clientVersion) |
| .append("]"); |
| return buffer.toString(); |
| } |
| |
| public boolean isPrimary() |
| { |
| //boolean primary = this._messageDispatcher.isAlive() |
| // || this._messageDispatcher._messageQueue.isPrimary(); |
| boolean primary = this.isPrimary; |
| //System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive()); |
| //System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " + this._messageDispatcher._messageQueue.isPrimary()); |
| //System.out.println(this + ": IS PRIMARY: " + primary); |
| return primary; |
| // return this.isPrimary ; |
| } |
| |
| protected boolean basicIsPrimary() { |
| return this.isPrimary; |
| } |
| |
| protected void setPrimary(boolean isPrimary) { |
| this.isPrimary = isPrimary; |
| } |
| |
| // private static int nextId = 0; |
| // static protected int getNextId() { |
| // synchronized (CacheClientProxy.class) { |
| // return ++nextId; |
| // } |
| // } |
| /* |
| * Return this client's HA region queue |
| * @returns - HARegionQueue of the client |
| */ |
| public HARegionQueue getHARegionQueue() { |
| if (this._messageDispatcher != null){ |
| return _messageDispatcher._messageQueue; |
| } |
| return null; |
| } |
| |
| |
| /** |
| * Reinitialize a durable <code>CacheClientProxy</code> with a new client. |
| * @param socket |
| * The socket between the server and the client |
| * @param ip |
| * whether this proxy represents the primary |
| */ |
| protected void reinitialize(Socket socket, ClientProxyMembershipID proxyId, |
| Cache cache, boolean ip, byte cc, Version ver) { |
| // Re-initialize transient fields |
| initializeTransientFields(socket, proxyId, ip, cc, ver); |
| getCacheClientNotifier().getAcceptorStats().incCurrentQueueConnections(); |
| |
| |
| // Cancel expiration task |
| cancelDurableExpirationTask(true); |
| |
| // Set the message dispatcher's primary flag. This could go from primary |
| // to secondary |
| this._messageDispatcher._messageQueue.setPrimary(ip); |
| this._messageDispatcher._messageQueue.setClientConflation(cc); |
| |
| reinitializeClientAuths(); |
| this.creationDate = new Date(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Has been reinitialized", this); |
| } |
| } |
| |
| protected boolean isDurable() { |
| return getProxyID().isDurable(); |
| } |
| |
| protected String getDurableId() { |
| return getProxyID().getDurableId(); |
| } |
| |
| protected int getDurableTimeout() { |
| return getProxyID().getDurableTimeout(); |
| } |
| |
| private boolean getDurableKeepAlive() { |
| return this.keepalive; |
| } |
| |
| protected String getHARegionName() { |
| return getProxyID().getHARegionName(); |
| } |
| |
| public Region getHARegion() { |
| return this._messageDispatcher._messageQueue.getRegion(); |
| } |
| |
| public Version getVersion() { |
| return this.clientVersion; |
| } |
| |
| protected void scheduleDurableExpirationTask() { |
| SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { |
| @Override |
| public void run2() { |
| _durableExpirationTask.compareAndSet(this, null); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__THE_EXPIRATION_TASK_HAS_FIRED_SO_THIS_PROXY_IS_BEING_TERMINATED, CacheClientProxy.this)); |
| // Remove the proxy from the CacheClientNofier's registry |
| getCacheClientNotifier().removeClientProxy(CacheClientProxy.this); |
| getCacheClientNotifier().durableClientTimedOut(CacheClientProxy.this.proxyID); |
| |
| // Close the proxy |
| terminateDispatching(false); |
| _cacheClientNotifier._statistics.incQueueDroppedCount(); |
| |
| /** |
| * Setting the expiration task to null again and cancelling existing |
| * one, if any. See #50894. |
| * <p/> |
| * The message dispatcher may again set the expiry task in below path: |
| * <code> |
| * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.scheduleDurableExpirationTask(CacheClientProxy.java:2020) |
| * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.pauseDispatching(CacheClientProxy.java:924) |
| * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.pauseOrUnregisterProxy(CacheClientProxy.java:2813) |
| * com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.run(CacheClientProxy.java:2692) |
| * </code> |
| * <p/> |
| * This is because message dispatcher may get an IOException with |
| * "Proxy closing due to socket being closed locally" during/after |
| * terminateDispatching(false) above. |
| */ |
| Object task = _durableExpirationTask.getAndSet(null); |
| if (task != null) { |
| ((SystemTimerTask)task).cancel(); |
| } |
| } |
| |
| }; |
| if(this._durableExpirationTask.compareAndSet(null, task)) { |
| _cache.getCCPTimer().schedule(task, |
| getDurableTimeout()*1000L); |
| } |
| } |
| |
| protected void cancelDurableExpirationTask(boolean logMessage) { |
| SystemTimer.SystemTimerTask task = (SystemTimerTask) _durableExpirationTask.getAndSet(null); |
| if (task != null) { |
| if (logMessage) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_CANCELLING_EXPIRATION_TASK_SINCE_THE_CLIENT_HAS_RECONNECTED, this)); |
| } |
| task.cancel(); |
| } |
| } |
| |
| /** |
| * Class <code>ClientInterestList</code> provides a convenient interface |
| * for manipulating client interest information. |
| */ |
| static protected class ClientInterestList |
| { |
| |
| final CacheClientProxy ccp; |
| |
| final Object id; |
| |
| /** |
| * An object used for synchronizing the interest lists |
| */ |
| final private Object interestListLock = new Object(); |
| |
| /** |
| * Regions that this client is interested in |
| */ |
| final protected Set<String> regions = new HashSet<String>(); |
| |
| /** |
| * Constructor. |
| */ |
| protected ClientInterestList(CacheClientProxy ccp, Object interestID) { |
| this.ccp = ccp; |
| this.id = interestID; |
| // this.id = getNextId(); |
| } |
| |
| /** |
| * Registers interest in the input region name and key |
| */ |
| protected void registerClientInterest(String regionName, |
| Object keyOfInterest, int interestType, boolean sendUpdatesAsInvalidates) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: registerClientInterest region={} key={}", ccp, regionName, keyOfInterest); |
| } |
| Set keysRegistered = null; |
| synchronized(this.interestListLock) { |
| LocalRegion r = (LocalRegion)this.ccp._cache.getRegion(regionName, true); |
| if (r == null) { |
| throw new RegionDestroyedException("Region could not be found for interest registration", regionName); |
| } |
| if ( ! (r instanceof CacheDistributionAdvisee) ) { |
| throw new IllegalArgumentException("region " + regionName + " is not distributed and does not support interest registration"); |
| } |
| FilterProfile p = r.getFilterProfile(); |
| keysRegistered = p.registerClientInterest(id, keyOfInterest, interestType, sendUpdatesAsInvalidates); |
| regions.add(regionName); |
| } |
| // Perform actions if any keys were registered |
| if ((keysRegistered != null) && containsInterestRegistrationListeners() |
| && !keysRegistered.isEmpty()) { |
| handleInterestEvent(regionName, keysRegistered, interestType, true); |
| } |
| } |
| |
| |
| protected FilterProfile getProfile(String regionName) { |
| try { |
| return this.ccp._cache.getFilterProfile(regionName); |
| } catch (CacheClosedException e) { |
| return null; |
| } |
| } |
| |
| /** |
| * Unregisters interest in the input region name and key |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to unregister |
| * interest |
| * @param keyOfInterest |
| * The key in which to unregister interest |
| */ |
| protected void unregisterClientInterest(String regionName, |
| Object keyOfInterest, int interestType) |
| { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: unregisterClientInterest region={} key={}", ccp, regionName, keyOfInterest); |
| } |
| FilterProfile p = getProfile(regionName); |
| Set keysUnregistered = null; |
| synchronized(this.interestListLock) { |
| if (p != null) { |
| keysUnregistered = p.unregisterClientInterest( |
| id, keyOfInterest, interestType); |
| if (!p.hasInterestFor(id)) { |
| this.regions.remove(regionName); |
| } |
| } else { |
| this.regions.remove(regionName); |
| } |
| } |
| if (keysUnregistered != null && !keysUnregistered.isEmpty()) { |
| handleInterestEvent(regionName, keysUnregistered, interestType, false); |
| } |
| } |
| |
| /** |
| * Registers interest in the input region name and list of keys |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to register |
| * interest |
| * @param keysOfInterest |
| * The list of keys in which to register interest |
| */ |
| protected void registerClientInterestList(String regionName, |
| List keysOfInterest, boolean sendUpdatesAsInvalidates) { |
| FilterProfile p = getProfile(regionName); |
| if (p == null) { |
| throw new RegionDestroyedException("Region not found during client interest registration", regionName); |
| } |
| Set keysRegistered = null; |
| synchronized(this.interestListLock) { |
| keysRegistered = p.registerClientInterestList(id, keysOfInterest, sendUpdatesAsInvalidates); |
| regions.add(regionName); |
| } |
| // Perform actions if any keys were registered |
| if (containsInterestRegistrationListeners() && !keysRegistered.isEmpty()) { |
| handleInterestEvent(regionName, keysRegistered, InterestType.KEY, true); |
| } |
| } |
| |
| /** |
| * Unregisters interest in the input region name and list of keys |
| * |
| * @param regionName |
| * The fully-qualified name of the region in which to unregister |
| * interest |
| * @param keysOfInterest |
| * The list of keys in which to unregister interest |
| */ |
| protected void unregisterClientInterestList(String regionName, |
| List keysOfInterest) |
| { |
| FilterProfile p = getProfile(regionName); |
| Set keysUnregistered = null; |
| synchronized(this.interestListLock) { |
| if (p != null) { |
| keysUnregistered = p.unregisterClientInterestList( |
| id, keysOfInterest); |
| if (!p.hasInterestFor(id)) { |
| regions.remove(regionName); |
| } |
| } else { |
| regions.remove(regionName); |
| } |
| } |
| // Perform actions if any keys were unregistered |
| if (!keysUnregistered.isEmpty()) { |
| handleInterestEvent(regionName, keysUnregistered, InterestType.KEY,false); |
| } |
| } |
| |
| /* |
| * Returns whether this interest list has any keys, patterns or filters of |
| * interest. It answers the question: Are any clients being notified because |
| * of this interest list? @return whether this interest list has any keys, |
| * patterns or filters of interest |
| */ |
| protected boolean hasInterest() { |
| return regions.size() > 0; |
| } |
| |
| protected void clearClientInterestList() { |
| boolean isClosed = ccp.getCache().isClosed(); |
| |
| synchronized(this.interestListLock) { |
| for (String regionName: regions) { |
| FilterProfile p = getProfile(regionName); |
| if (p == null) { |
| continue; |
| } |
| if (!isClosed) { |
| if (p.hasAllKeysInterestFor(id)) { |
| Set allKeys = new HashSet(); |
| allKeys.add(".*"); |
| allKeys = Collections.unmodifiableSet(allKeys); |
| handleInterestEvent(regionName, allKeys, |
| InterestType.REGULAR_EXPRESSION, false); |
| } |
| Set keysOfInterest = p.getKeysOfInterestFor(id); |
| if (keysOfInterest != null && keysOfInterest.size() > 0) { |
| handleInterestEvent(regionName, keysOfInterest, |
| InterestType.KEY, false); |
| } |
| Map<String,Pattern> patternsOfInterest = |
| p.getPatternsOfInterestFor(id); |
| if (patternsOfInterest != null && patternsOfInterest.size() > 0) { |
| handleInterestEvent(regionName, patternsOfInterest.keySet(), |
| InterestType.REGULAR_EXPRESSION, false); |
| } |
| } |
| p.clearInterestFor(id); |
| } |
| regions.clear(); |
| } |
| } |
| |
| |
| private void handleInterestEvent(String regionName, Set keysOfInterest, |
| int interestType, boolean isRegister) { |
| // Notify the region about this register interest event if: |
| // - the application has requested it |
| // - this is a primary CacheClientProxy (otherwise multiple notifications |
| // may occur) |
| // - it is a key interest type (regex is currently not supported) |
| InterestRegistrationEvent event = null; |
| if (NOTIFY_REGION_ON_INTEREST && this.ccp.isPrimary() |
| && interestType == InterestType.KEY) { |
| event = new InterestRegistrationEventImpl(this.ccp, regionName, |
| keysOfInterest, interestType, isRegister); |
| try { |
| notifyRegionOfInterest(event); |
| } |
| catch (Exception e) { |
| logger.warn(LocalizedStrings.CacheClientProxy_REGION_NOTIFICATION_OF_INTEREST_FAILED, e); |
| } |
| } |
| // Invoke interest registration listeners |
| if (containsInterestRegistrationListeners()) { |
| if (event == null) { |
| event = new InterestRegistrationEventImpl(this.ccp, regionName, |
| keysOfInterest, interestType, isRegister); |
| } |
| notifyInterestRegistrationListeners(event); |
| } |
| } |
| |
| private void notifyRegionOfInterest(InterestRegistrationEvent event) { |
| this.ccp.getCacheClientNotifier().handleInterestEvent(event); |
| } |
| |
| private void notifyInterestRegistrationListeners( |
| InterestRegistrationEvent event) { |
| this.ccp.getCacheClientNotifier().notifyInterestRegistrationListeners( |
| event); |
| } |
| |
| private boolean containsInterestRegistrationListeners() { |
| return this.ccp.getCacheClientNotifier() |
| .containsInterestRegistrationListeners(); |
| } |
| } |
| |
| |
| /** |
| * Class <code>MessageDispatcher</code> is a <code>Thread</code> that |
| * processes messages bound for the client by taking messsages from the |
| * message queue and sending them to the client over the socket. |
| */ |
| static class MessageDispatcher extends Thread |
| { |
| |
| /** |
| * The queue of messages to be sent to the client |
| */ |
| protected final HARegionQueue _messageQueue; |
| |
| // /** |
| // * An int used to keep track of the number of messages dropped for logging |
| // * purposes. If greater than zero then a warning has been logged about |
| // * messages being dropped. |
| // */ |
| // private int _numberOfMessagesDropped = 0; |
| |
| /** |
| * The proxy for which this dispatcher is processing messages |
| */ |
| private final CacheClientProxy _proxy; |
| |
| // /** |
| // * The conflator faciliates message conflation |
| // */ |
| // protected BridgeEventConflator _eventConflator; |
| |
| /** |
| * Whether the dispatcher is stopped |
| */ |
| private volatile boolean _isStopped = true; |
| |
| /** |
| * @guarded.By _pausedLock |
| */ |
| //boolean _isPausedDispatcher = false; |
| |
| /** |
| * A lock object used to control pausing this dispatcher |
| */ |
| protected final Object _pausedLock = new Object(); |
| |
| /** |
| * An object used to protect when dispatching is being stopped. |
| */ |
| private final Object _stopDispatchingLock = new Object(); |
| |
| private final ReadWriteLock socketLock = new ReentrantReadWriteLock(); |
| |
| private final Lock socketWriteLock = socketLock.writeLock(); |
| // /** |
| // * A boolean verifying whether a warning has already been issued if the |
| // * message queue has reached its capacity. |
| // */ |
| // private boolean _messageQueueCapacityReachedWarning = false; |
| |
| /** |
| * Constructor. |
| * |
| * @param proxy |
| * The <code>CacheClientProxy</code> for which this dispatcher is |
| * processing messages |
| * @param name thread name for this dispatcher |
| * @throws CacheException |
| */ |
| protected MessageDispatcher(CacheClientProxy proxy, String name) throws CacheException { |
| super(LoggingThreadGroup.createThreadGroup(name, logger), name); |
| |
| setDaemon(true); |
| |
| this._proxy = proxy; |
| |
| // Create the event conflator |
| // this._eventConflator = new BridgeEventConflator |
| |
| // Create the message queue |
| try { |
| HARegionQueueAttributes harq= new HARegionQueueAttributes(); |
| harq.setBlockingQueueCapacity(proxy._maximumMessageCount); |
| harq.setExpiryTime(proxy._messageTimeToLive); |
| ((HAContainerWrapper)proxy._cacheClientNotifier.getHaContainer()) |
| .putProxy(HARegionQueue.createRegionName(getProxy() |
| .getHARegionName()), getProxy()); |
| boolean createDurableQueue = proxy.proxyID.isDurable(); |
| boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0) |
| && InternalDistributedSystem.getAnyInstance().getConfig() |
| .getDeltaPropagation() |
| && !(this._proxy.clientConflation == HandShake.CONFLATION_ON); |
| if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) { |
| logger.debug("Creating a durable HA queue"); |
| } |
| this._messageQueue = |
| HARegionQueue.getHARegionQueueInstance( |
| getProxy().getHARegionName(), getCache(), harq, |
| HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue, |
| proxy._cacheClientNotifier.getHaContainer(), |
| proxy.getProxyID(), |
| this._proxy.clientConflation, |
| this._proxy.isPrimary(), canHandleDelta); |
| // Check if interests were registered during HARegion GII. |
| if (this._proxy.hasRegisteredInterested()) { |
| this._messageQueue.setHasRegisteredInterest(true); |
| } |
| } |
| catch (CancelException e) { |
| throw e; |
| } |
| catch (RegionExistsException ree) { |
| throw ree; |
| } |
| catch (Exception e) { |
| getCache().getCancelCriterion().checkCancelInProgress(e); |
| throw new CacheException(LocalizedStrings.CacheClientProxy_EXCEPTION_OCCURRED_WHILE_TRYING_TO_CREATE_A_MESSAGE_QUEUE.toLocalizedString(), e) { |
| private static final long serialVersionUID = 0L;}; |
| } |
| } |
| |
| private CacheClientProxy getProxy() { |
| return this._proxy; |
| } |
| private GemFireCacheImpl getCache() { |
| return getProxy().getCache(); |
| } |
| private Socket getSocket() { |
| return getProxy().getSocket(); |
| } |
| private ByteBuffer getCommBuffer() { |
| return getProxy().getCommBuffer(); |
| } |
| private CacheClientProxyStats getStatistics() { |
| return getProxy().getStatistics(); |
| } |
| |
| private void basicStopDispatching() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: notified dispatcher to stop", this); |
| } |
| this._isStopped = true; |
| // this.interrupt(); // don't interrupt here. Let close(boolean) do this. |
| } |
| |
| @Override |
| public String toString() { |
| return getProxy().toString(); |
| } |
| |
| /** |
| * Notifies the dispatcher to stop dispatching. |
| * |
| * @param checkQueue |
| * Whether to check the message queue for any unprocessed messages |
| * and process them for MAXIMUM_SHUTDOWN_PEEKS. |
| * |
| * @see CacheClientProxy#MAXIMUM_SHUTDOWN_PEEKS |
| */ |
| protected synchronized void stopDispatching(boolean checkQueue) |
| { |
| if (isStopped()) { |
| return; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Stopping dispatching", this); |
| } |
| if (!checkQueue) { |
| basicStopDispatching(); |
| return; |
| } |
| |
| // Stay alive until the queue is empty or a number of peeks is reached. |
| List events = null; |
| try { |
| for (int numberOfPeeks = 0; numberOfPeeks < MAXIMUM_SHUTDOWN_PEEKS; |
| ++numberOfPeeks) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| events = this._messageQueue.peek(1, -1); |
| if (events == null || events.size() == 0) { |
| break; |
| } |
| if (logger.isDebugEnabled()){ |
| logger.debug("Waiting for client to drain queue: {}", _proxy.proxyID); |
| } |
| Thread.sleep(500); |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| /*GemFireCache c = (GemFireCache)_cache; |
| c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e);*/ |
| } |
| catch (CancelException e) { |
| break; |
| } |
| catch (CacheException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Exception occurred while trying to stop dispatching", this, e); |
| } |
| } |
| finally { |
| if (interrupted) Thread.currentThread().interrupt(); |
| } |
| } // for |
| } |
| finally { |
| basicStopDispatching(); |
| } |
| } |
| |
| /** |
| * Returns whether the dispatcher is stopped |
| * |
| * @return whether the dispatcher is stopped |
| */ |
| protected boolean isStopped() |
| { |
| return this._isStopped; |
| } |
| |
| /** |
| * Returns the size of the queue for heuristic purposes. This size may be |
| * changing concurrently if puts / gets are occurring at the same time. |
| * |
| * @return the size of the queue |
| */ |
| protected int getQueueSize() |
| { |
| return this._messageQueue == null ? 0 : this._messageQueue.size(); |
| } |
| |
| /** |
| * Returns the size of the queue calculated through stats |
| * This includes events that have dispatched but have yet been removed |
| * @return the size of the queue |
| */ |
| protected int getQueueSizeStat() |
| { |
| if (this._messageQueue != null) { |
| HARegionQueueStats stats = this._messageQueue.getStatistics(); |
| return ((int)(stats.getEventsEnqued() - stats.getEventsRemoved() - stats.getEventsConflated() - stats.getMarkerEventsConflated() - stats.getEventsExpired() - stats.getEventsRemovedByQrm() - stats.getEventsTaken() - stats.getNumVoidRemovals())); |
| } |
| return 0; |
| } |
| |
| protected void drainClientCqEvents(ClientProxyMembershipID clientId, InternalCqQuery cqToClose) { |
| this._messageQueue.closeClientCq(clientId, cqToClose); |
| } |
| |
| /** |
| * Runs the dispatcher by taking a message from the queue and sending it to |
| * the client attached to this proxy. |
| */ |
| @Override |
| public void run() |
| { |
| boolean exceptionOccured = false; |
| this._isStopped = false; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Beginning to process events", this); |
| } |
| // for testing purposes |
| if (isSlowStartForTesting) { |
| long slowStartTimeForTesting = Long.getLong(KEY_SLOW_START_TIME_FOR_TESTING, |
| DEFAULT_SLOW_STARTING_TIME).longValue(); |
| long elapsedTime = 0; |
| long startTime = System.currentTimeMillis(); |
| while ((slowStartTimeForTesting > elapsedTime) && isSlowStartForTesting) { |
| try { |
| Thread.sleep(500); |
| } |
| catch (InterruptedException ignore) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Slow start for testing interrupted"); |
| } |
| break; |
| } |
| elapsedTime = System.currentTimeMillis() - startTime; |
| } |
| if(slowStartTimeForTesting < elapsedTime) { |
| isSlowStartForTesting = false; |
| } |
| } |
| |
| ClientMessage clientMessage = null; |
| while (!isStopped()) { |
| // SystemFailure.checkFailure(); DM's stopper does this |
| if (this._proxy._cache.getCancelCriterion().cancelInProgress() != null) { |
| break; |
| } |
| try { |
| // If paused, wait to be told to resume (or interrupted if stopped) |
| if (getProxy().isPaused()) { |
| try { |
| // ARB: Before waiting for resumption, process acks from client. |
| // This will reduce the number of duplicates that a client receives after |
| // reconnecting. |
| if (this._messageQueue.size() > 0) { |
| Thread.sleep(50); |
| } |
| while (!this._messageQueue.isEmptyAckList()&& this._messageQueue.isPeekInitialized()) { |
| this._messageQueue.remove(); |
| } |
| } |
| catch (InterruptedException ex) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_SLEEP_INTERRUPTED, this)); |
| } |
| waitForResumption(); |
| } |
| try { |
| clientMessage = (ClientMessage)this._messageQueue.peek(); |
| } |
| catch (RegionDestroyedException skipped) { |
| break; |
| } |
| getStatistics().setQueueSize(this._messageQueue.size()); |
| if (isStopped()) { |
| break; |
| } |
| // Process the message |
| long start = getStatistics().startTime(); |
| //// BUGFIX for BUG#38206 and BUG#37791 |
| boolean isDispatched = dispatchMessage(clientMessage); |
| getStatistics().endMessage(start); |
| if(isDispatched){ |
| this._messageQueue.remove(); |
| if (clientMessage instanceof ClientMarkerMessageImpl) { |
| getProxy().markerEnqueued = false; |
| } |
| } |
| clientMessage = null; |
| } |
| catch (IOException e) { |
| // Added the synchronization below to ensure that exception handling |
| // does not occur while stopping the dispatcher and vice versa. |
| synchronized (this._stopDispatchingLock) { |
| // An IOException occurred while sending a message to the |
| // client. If the processor is not already stopped, assume |
| // the client is dead and stop processing. |
| if (!isStopped() && !getProxy().isPaused()) { |
| if ("Broken pipe".equals(e.getMessage())) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_PROXY_CLOSING_DUE_TO_UNEXPECTED_BROKEN_PIPE_ON_SOCKET_CONNECTION, this)); |
| } else if ("Connection reset".equals(e.getMessage())) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_PROXY_CLOSING_DUE_TO_UNEXPECTED_RESET_ON_SOCKET_CONNECTION, this)); |
| } |
| else if ("Connection reset by peer".equals(e.getMessage())) { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_PROXY_CLOSING_DUE_TO_UNEXPECTED_RESET_BY_PEER_ON_SOCKET_CONNECTION, this)); |
| } |
| else if ("Socket is closed".equals(e.getMessage()) || "Socket Closed".equals(e.getMessage())) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_PROXY_CLOSING_DUE_TO_SOCKET_BEING_CLOSED_LOCALLY, this)); |
| } |
| else { |
| logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_AN_UNEXPECTED_IOEXCEPTION_OCCURRED_SO_THE_PROXY_WILL_BE_CLOSED, this), e); |
| } |
| // Let the CacheClientNotifier discover the proxy is not alive. |
| // See isAlive(). |
| // getProxy().close(false); |
| |
| pauseOrUnregisterProxy(); |
| } // _isStopped |
| } // synchronized |
| exceptionOccured = true; |
| } // IOException |
| catch (InterruptedException e) { |
| // If the thread is paused, ignore the InterruptedException and |
| // continue. The proxy is null if stopDispatching has been called. |
| if (getProxy().isPaused()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: interrupted because it is being paused. It will continue and wait for resumption.", this); |
| } |
| Thread.interrupted(); |
| continue; |
| } |
| |
| // no need to reset the bit; we're exiting |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: interrupted", this); |
| } |
| break; |
| } |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: shutting down due to cancellation", this); |
| } |
| exceptionOccured = true; // message queue is defunct, don't try to read it. |
| break; |
| } |
| catch (RegionDestroyedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: shutting down due to loss of message queue", this); |
| } |
| exceptionOccured = true; // message queue is defunct, don't try to read it. |
| break; |
| } |
| catch (Exception e) { |
| // An exception occured while processing a message. Since it |
| // is not an IOException, the client may still be alive, so |
| // continue processing. |
| if (!isStopped()) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__AN_UNEXPECTED_EXCEPTION_OCCURRED, this), e); |
| } |
| } |
| } |
| |
| // Processing gets here if isStopped=true. What is this code below doing? |
| List list = null; |
| if(!exceptionOccured) { |
| try { |
| // Clear the interrupt status if any, |
| Thread.interrupted(); |
| int size = this._messageQueue.size(); |
| list = this._messageQueue.peek(size); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: After flagging the dispatcher to stop , the residual List of messages to be dispatched={} size={}", this, list, list.size()); |
| } |
| if (list.size() > 0) { |
| long start = getStatistics().startTime(); |
| Iterator itr = list.iterator(); |
| while (itr.hasNext()) { |
| dispatchMessage((ClientMessage)itr.next()); |
| getStatistics().endMessage(start); |
| // @todo asif: shouldn't we call itr.remove() since the current msg |
| // has been sent? That way list will be more accurate |
| // if we have an exception. |
| } |
| this._messageQueue.remove(); |
| } |
| } |
| catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("CacheClientNotifier stopped due to cancellation"); |
| } |
| } |
| catch (Exception ignore) { |
| //if (logger.isInfoEnabled()) { |
| StringId extraMsg = null; |
| |
| if ("Broken pipe".equals(ignore.getMessage())) { |
| extraMsg = LocalizedStrings.CacheClientProxy_PROBLEM_CAUSED_BY_BROKEN_PIPE_ON_SOCKET; |
| } |
| else if (ignore instanceof RegionDestroyedException) { |
| extraMsg = LocalizedStrings.CacheClientProxy_PROBLEM_CAUSED_BY_MESSAGE_QUEUE_BEING_CLOSED; |
| } |
| final Object[] msgArgs = new Object[] { |
| ((!isStopped()) ? this.toString() + ": " : ""), |
| ((list == null) ? 0 : list.size())}; |
| if (extraMsg != null) { |
| //Dont print exception details, but add on extraMsg |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_POSSIBILITY_OF_NOT_BEING_ABLE_TO_SEND_SOME_OR_ALL_THE_MESSAGES_TO_CLIENTS_TOTAL_MESSAGES_CURRENTLY_PRESENT_IN_THE_LIST_1, msgArgs)); |
| logger.info(extraMsg); |
| } else { |
| //Print full stacktrace |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_POSSIBILITY_OF_NOT_BEING_ABLE_TO_SEND_SOME_OR_ALL_THE_MESSAGES_TO_CLIENTS_TOTAL_MESSAGES_CURRENTLY_PRESENT_IN_THE_LIST_1, msgArgs), ignore); |
| } |
| } |
| |
| if (list != null && logger.isTraceEnabled()) { |
| logger.trace("Messages remaining in the list are: {}", list); |
| } |
| |
| //} |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Dispatcher thread is ending", this); |
| } |
| |
| } |
| |
| private void pauseOrUnregisterProxy() { |
| if (getProxy().isDurable()) { |
| try { |
| getProxy().pauseDispatching(); |
| } catch (Exception ex) { |
| // see bug 40611; we catch Exception here because |
| // we once say an InterruptedException here. |
| // log a warning saying we couldn't pause? |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: {}", this, ex); |
| } |
| } |
| } |
| else { |
| this._isStopped = true; |
| } |
| |
| // Stop the ServerConnections. This will force the client to |
| // server communication to close. |
| ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); |
| |
| // Note now that _proxy is final the following comment is no |
| // longer true. the _isStopped check should be sufficient. |
| // Added the test for this._proxy != null to prevent bug 35801. |
| // The proxy could have been stopped after this IOException has |
| // been caught and here, so the _proxy will be null. |
| if (chm != null) { |
| ClientProxyMembershipID proxyID = getProxy().proxyID; |
| chm.removeAllConnectionsAndUnregisterClient(proxyID); |
| if (!getProxy().isDurable()) { |
| getProxy().getCacheClientNotifier().unregisterClient(proxyID, false); |
| } |
| } |
| } |
| |
| /** |
| * Sends a message to the client attached to this proxy |
| * |
| * @param clientMessage |
| * The <code>ClientMessage</code> to send to the client |
| * |
| * @throws IOException |
| */ |
| protected boolean dispatchMessage(ClientMessage clientMessage) |
| throws IOException |
| { |
| boolean isDispatched = false ; |
| if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) { |
| logger.trace(LogMarker.BRIDGE_SERVER, "Dispatching {}", clientMessage); |
| } |
| Message message = null; |
| |
| // byte[] latestValue = |
| // this._eventConflator.getLatestValue(clientMessage); |
| |
| if (clientMessage instanceof ClientUpdateMessage) { |
| byte[] latestValue = (byte[])((ClientUpdateMessage)clientMessage).getValue(); |
| if (logger.isTraceEnabled()) { |
| StringBuilder msg = new StringBuilder(100); |
| msg.append(this).append(": Using latest value: ").append(Arrays.toString(latestValue)); |
| if (((ClientUpdateMessage)clientMessage).valueIsObject()) { |
| if (latestValue != null) { |
| msg.append(" (").append(deserialize(latestValue)).append(")"); |
| } |
| msg.append(" for ").append(clientMessage); |
| } |
| logger.trace(msg.toString()); |
| } |
| |
| message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy(), |
| latestValue); |
| |
| if (AFTER_MESSAGE_CREATION_FLAG) { |
| BridgeObserver bo = BridgeObserverHolder.getInstance(); |
| bo.afterMessageCreation(message); |
| } |
| } |
| else { |
| message = clientMessage.getMessage(getProxy(), true /* notify */); |
| } |
| |
| // ////////////////////////////// |
| // TEST CODE BEGIN (Throws exception to test closing proxy) |
| // if (true) throw new IOException("test"); |
| // TEST CODE END |
| // ////////////////////////////// |
| // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID, latestValue); |
| //Message message = clientMessage.getMessage(); removed during merge. |
| // BugFix for BUG#38206 and BUG#37791 |
| if (!this._proxy.isPaused()) { |
| sendMessage(message); |
| |
| // ////////////////////////////// |
| // TEST CODE BEGIN (Throws exception to test closing proxy) |
| // if (true) throw new IOException("test"); |
| // TEST CODE END |
| // ////////////////////////////// |
| //Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID, latestValue); |
| //Message message = clientMessage.getMessage(); removed during merge. |
| //message.setComms(getSocket(), getCommBuffer(), getStatistics()); |
| //message.send(); |
| |
| // ////////////////////////////// |
| // TEST CODE BEGIN (Introduces random wait in client) |
| // Sleep a random number of ms |
| // java.util.Random rand = new java.util.Random(); |
| // try {Thread.sleep(rand.nextInt(5));} catch (InterruptedException e) {} |
| // TEST CODE END |
| // ////////////////////////////// |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("{}: Dispatched {}", this, clientMessage); |
| } |
| isDispatched = true; |
| } |
| else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Message Dispatcher of a Paused CCProxy is trying to dispatch message"); |
| } |
| } |
| if (isDispatched) { |
| this._messageQueue.getStatistics().incEventsDispatched(); |
| } |
| return isDispatched; |
| } |
| |
| private void sendMessage(Message message) throws IOException { |
| if (message == null) { |
| return; |
| } |
| this.socketWriteLock.lock(); |
| try { |
| message.setComms(getSocket(), getCommBuffer(), getStatistics()); |
| message.send(); |
| getProxy().resetPingCounter(); |
| } finally { |
| this.socketWriteLock.unlock(); |
| } |
| } |
| |
| /** |
| * Add the input client message to the message queue |
| * |
| * @param clientMessage |
| * The <code>Conflatable</code> to add to the queue |
| */ |
| protected void enqueueMessage(Conflatable clientMessage) |
| { |
| try { |
| this._messageQueue.put(clientMessage); |
| if(this._proxy.isPaused() && this._proxy.isDurable()){ |
| this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage); |
| } |
| } else { |
| // [bruce] we don't really know that it was added, so don't log this |
| // if (logger.isDebugEnabled() || BridgeServerImpl.VERBOSE) { |
| // logger.debug(LocalizedStrings.DEBUG, this + " added message to queue: " + clientMessage); |
| // } |
| } |
| } |
| catch (CancelException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| if (!isStopped()) { |
| this._proxy._statistics.incMessagesFailedQueued(); |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_ADD_MESSAGE_TO_QUEUE, this), e); |
| } |
| } |
| } |
| |
| |
| protected void enqueueMarker(ClientMessage message) { |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Queueing marker message. <{}>. The queue contains {} entries.", this, message, getQueueSize()); |
| } |
| this._messageQueue.put(message); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Queued marker message. The queue contains {} entries.", this, getQueueSize()); |
| } |
| } |
| catch (CancelException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| if (!isStopped()) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_ADD_MESSAGE_TO_QUEUE, this), e); |
| } |
| } |
| } |
| |
| private void sendMessageDirectly(ClientMessage clientMessage) { |
| Message message; |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Dispatching directly: {}", this, clientMessage); |
| } |
| message = clientMessage.getMessage(getProxy(), true); |
| sendMessage(message); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Dispatched directly: {}", this, clientMessage); |
| } |
| // The exception handling code was modeled after the MessageDispatcher |
| // run method |
| } catch (IOException e) { |
| synchronized (this._stopDispatchingLock) { |
| // Pause or unregister proxy |
| if (!isStopped() && !getProxy().isPaused()) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__AN_UNEXPECTED_EXCEPTION_OCCURRED, this), e); |
| pauseOrUnregisterProxy(); |
| } |
| } |
| } catch (Exception e) { |
| if (!isStopped()) { |
| logger.fatal(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__AN_UNEXPECTED_EXCEPTION_OCCURRED, this), e); |
| } |
| } |
| } |
| |
| protected void waitForResumption() throws InterruptedException { |
| synchronized (this._pausedLock) { |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__PAUSING_PROCESSING, this)); |
| if (!getProxy().isPaused()) { |
| return; |
| } |
| while (getProxy().isPaused()) { |
| this._pausedLock.wait(); |
| } |
| // Fix for #48571 |
| _messageQueue.clearPeekedIDs(); |
| } |
| } |
| |
| protected void resumeDispatching() { |
| logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientProxy_0__RESUMING_PROCESSING, this)); |
| |
| // Notify thread to resume |
| this._pausedLock.notifyAll(); |
| } |
| |
| protected Object deserialize(byte[] serializedBytes) |
| { |
| Object deserializedObject = serializedBytes; |
| // This is a debugging method so ignore all exceptions like |
| // ClassNotFoundException |
| try { |
| DataInputStream dis = new DataInputStream(new ByteArrayInputStream( |
| serializedBytes)); |
| deserializedObject = DataSerializer.readObject(dis); |
| } |
| catch (Exception e) { |
| } |
| return deserializedObject; |
| } |
| |
| protected void initializeTransients() |
| { |
| while (!this._messageQueue.isEmptyAckList()&& this._messageQueue.isPeekInitialized()) { |
| try { |
| this._messageQueue.remove(); |
| }catch(InterruptedException e){ |
| e.printStackTrace(); |
| } |
| } |
| this._messageQueue.initializeTransients(); |
| } |
| } |
| |
| /** |
| * Returns the current number of CQS the client installed. |
| * |
| * @return int the current count of CQs for this client |
| */ |
| public int getCqCount() { |
| synchronized (this) { |
| return this._statistics.getCqCount(); |
| } |
| } |
| |
| /** |
| * Increment the number of CQs the client installed |
| * |
| */ |
| public void incCqCount() { |
| synchronized (this) { |
| this._statistics.incCqCount(); |
| } |
| } |
| |
| /** |
| * Decrement the number of CQs the client installed |
| * |
| */ |
| public synchronized void decCqCount() { |
| synchronized (this) { |
| this._statistics.decCqCount(); |
| } |
| } |
| |
| /** |
| * Returns true if the client has one CQ |
| * |
| * @return true if the client has one CQ |
| */ |
| public boolean hasOneCq() { |
| synchronized (this) { |
| return this._statistics.getCqCount() == 1; |
| } |
| } |
| |
| /** |
| * Returns true if the client has no CQs |
| * |
| * @return true if the client has no CQs |
| */ |
| public boolean hasNoCq() { |
| synchronized (this) { |
| return this._statistics.getCqCount() == 0; |
| } |
| } |
| |
| /** |
| * Get map of regions with empty data policy |
| * |
| * @since 6.1 |
| */ |
| public Map getRegionsWithEmptyDataPolicy() { |
| return regionsWithEmptyDataPolicy; |
| } |
| |
| public int incrementAndGetPingCounter() { |
| int pingCount = this.pingCounter.incrementAndGet(); |
| return pingCount; |
| } |
| |
| public void resetPingCounter() { |
| this.pingCounter.set(0); |
| } |
| |
| /** |
| * Returns the number of seconds that have elapsed since the Client proxy |
| * created. |
| * |
| * @since 7.0 |
| */ |
| public long getUpTime() { |
| return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000); |
| } |
| |
| public interface TestHook { |
| public void doTestHook(String spot); |
| } |
| public static TestHook testHook; |
| } |