blob: 64731d11cb5ac2918b1fddd25799b27c3701cb4c [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.tier.sockets;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLException;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.StatisticDescriptor;
import com.gemstone.gemfire.Statistics;
import com.gemstone.gemfire.StatisticsType;
import com.gemstone.gemfire.StatisticsTypeFactory;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.InterestResultPolicy;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
import com.gemstone.gemfire.cache.client.internal.ClientUpdater;
import com.gemstone.gemfire.cache.client.internal.Endpoint;
import com.gemstone.gemfire.cache.client.internal.EndpointManager;
import com.gemstone.gemfire.cache.client.internal.GetEventValueOp;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.client.internal.QueueManager;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.DisconnectListener;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.InternalInstantiator;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.SocketUtils;
import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.BridgeObserver;
import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
import com.gemstone.gemfire.internal.cache.tier.MessageType;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
import com.gemstone.gemfire.security.AuthenticationFailedException;
import com.gemstone.gemfire.security.AuthenticationRequiredException;
import com.gemstone.gemfire.security.GemFireSecurityException;
/**
* <code>CacheClientUpdater</code> is a thread that processes update messages
* from a cache server and
* {@linkplain com.gemstone.gemfire.cache.Region#localInvalidate(Object) invalidates}
* the local cache based on the contents of those messages.
*
* @author Barry Oglesby
* @since 3.5
*/
public class CacheClientUpdater extends Thread implements ClientUpdater,
DisconnectListener {
private static final Logger logger = LogService.getLogger();
/**
* true if the constructor successfully created a connection. If false, the
* run method for this thread immediately exits.
*/
private final boolean connected;
/**
* System of which we are a part
*/
private final InternalDistributedSystem system;
/**
* The socket by which we communicate with the server
*/
private final Socket socket;
/**
* The output stream of the socket
*/
private final OutputStream out;
/**
* The input stream of the socket
*/
private final InputStream in;
/**
* Failed updater from the endpoint previously known as the primary
*/
private volatile ClientUpdater failedUpdater;
/**
* The buffer upon which we receive messages
*/
private final ByteBuffer commBuffer;
private boolean commBufferReleased;
private final CCUStats stats;
/**
* Cache for which we provide service
*/
private /*final*/ GemFireCacheImpl cache;
private /*final*/ CachedRegionHelper cacheHelper;
/**
* Principle flag to signal thread's run loop to terminate
*/
private final AtomicBoolean continueProcessing = new AtomicBoolean(true);
/**
* Is the client durable
* Used for bug 39010 fix
*/
private final boolean isDurableClient;
/**
* Represents the server we are connected to
*/
private final InternalDistributedMember serverId;
/**
* true if the EndPoint represented by this updater thread is primary
*/
private final boolean isPrimary;
/**
* Added to avoid recording of the event if the concerned operation failed.
* See #43247
*/
private boolean isOpCompleted;
public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
/*
* to enable test flag
*/
public static boolean isUsedByTest;
/**
* Indicates if full value was requested from server as a result of failure in
* applying delta bytes.
*/
public static boolean fullValueRequested = false;
// /**
// * True if this thread been initialized. Indicates that the run thread is
// * initialized and ready to process messages
// * <p>
// * TODO is this still needed?
// * <p>
// * Accesses synchronized via <code>this</code>
// *
// * @see #notifyInitializationComplete()
// * @see #waitForInitialization()
// */
// private boolean initialized = false;
private final ServerLocation location;
// TODO - remove these fields
private QueueManager qManager = null;
private EndpointManager eManager = null;
private Endpoint endpoint = null;
static private final long MAX_CACHE_WAIT =
Long.getLong("gemfire.CacheClientUpdater.MAX_WAIT", 120).longValue(); // seconds
/**
* Return true if cache appears
* @return true if cache appears
*/
private boolean waitForCache() {
GemFireCacheImpl c;
long tilt = System.currentTimeMillis() + MAX_CACHE_WAIT * 1000;
for (;;) {
if (quitting()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_ABANDONED_WAIT_DUE_TO_CANCELLATION, this));
return false;
}
if (!this.connected) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_ABANDONED_WAIT_BECAUSE_IT_IS_NO_LONGER_CONNECTED, this));
return false;
}
if (System.currentTimeMillis() > tilt) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_WAIT_TIMED_OUT_MORE_THAN_1_SECONDS, new Object[] { this,MAX_CACHE_WAIT }));
return false;
}
c = GemFireCacheImpl.getInstance();
if (c != null && !c.isClosed()) {
break;
}
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
interrupted = true;
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
this.cache = c;
this.cacheHelper = new CachedRegionHelper(c);
return true;
}
/**
* Creates a new <code>CacheClientUpdater</code> with a given name that
* waits for a server to connect on a given port.
*
* @param name
* descriptive name, used for our ThreadGroup
* @param location
* the endpoint we represent
* @param primary
* true if our endpoint is primary TODO ask the ep for this?
* @param ids
* the system we are distributing messages through
*
* @throws AuthenticationRequiredException
* when client is not configured to send credentials using
* security-* system properties but server expects credentials
* @throws AuthenticationFailedException
* when authentication of the client fails
* @throws ServerRefusedConnectionException
* when handshake fails for other reasons like using durable
* client ID that is already in use by another client or some
* server side exception while reading handshake/verifying
* credentials
*/
public CacheClientUpdater(
String name, ServerLocation location,
boolean primary, DistributedSystem ids,
HandShake handshake, QueueManager qManager, EndpointManager eManager,
Endpoint endpoint, int handshakeTimeout) throws AuthenticationRequiredException,
AuthenticationFailedException, ServerRefusedConnectionException {
super(LoggingThreadGroup.createThreadGroup("Client update thread"), name);
this.setDaemon(true);
this.system = (InternalDistributedSystem)ids;
this.isDurableClient = handshake.getMembership().isDurable();
this.isPrimary = primary;
this.location = location;
this.qManager = qManager;
//this holds the connection which this threads reads
this.eManager = eManager;
this.endpoint = endpoint;
this.stats = new CCUStats(this.system, this.location);
// Create the connection...
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Creating asynchronous update connection");
}
boolean success = false;
Socket mySock = null;
InternalDistributedMember sid = null;
ByteBuffer cb = null;
OutputStream tmpOut = null;
InputStream tmpIn = null;
try {
/** Size of the server-to-client communication socket buffers */
int socketBufferSize = Integer.getInteger(
"BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
if (!SocketCreator.getDefaultInstance()
.isHostReachable(InetAddress.getByName(location.getHostName()))) {
throw new NoRouteToHostException("Server is not reachable: " + location.getHostName());
}
mySock = SocketCreator.getDefaultInstance().connectForClient(
location.getHostName(), location.getPort(), handshakeTimeout, socketBufferSize);
mySock.setTcpNoDelay(true);
mySock.setSendBufferSize(socketBufferSize);
// Verify buffer sizes
verifySocketBufferSize(socketBufferSize, mySock.getReceiveBufferSize(), "receive");
verifySocketBufferSize(socketBufferSize, mySock.getSendBufferSize(), "send");
// set the timeout for the handshake
mySock.setSoTimeout(handshakeTimeout);
tmpOut = SocketUtils.getOutputStream(mySock);
tmpIn = SocketUtils.getInputStream(mySock);
if (isDebugEnabled) {
logger.debug("Initialized server-to-client socket with send buffer size: {} bytes and receive buffer size: {} bytes", mySock.getSendBufferSize(), mySock.getReceiveBufferSize());
}
if (isDebugEnabled) {
logger.debug("Created connection from {}:{} to CacheClientNotifier on port {} for server-to-client communication", mySock.getInetAddress().getHostAddress(), mySock.getLocalPort(), mySock.getPort());
}
ServerQueueStatus sqs = handshake.greetNotifier(mySock, this.isPrimary, location);
if (sqs.isPrimary() || sqs.isNonRedundant()) {
PoolImpl pool = (PoolImpl)this.qManager.getPool();
if (!pool.getReadyForEventsCalled()) {
pool.setPendingEventCount(sqs.getServerQueueSize());
}
}
{
int bufSize = 1024;
try {
bufSize = mySock.getSendBufferSize();
if (bufSize < 1024) {
bufSize = 1024;
}
}
catch (SocketException ignore) {
}
cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
}
{
// create a "server" memberId we currently don't know much about the
// server.
// Would be nice for it to send us its member id
// @todo - change the serverId to use the endpoint's getMemberId() which
// returns a
// DistributedMember (once gfecq branch is merged to trunk).
MemberAttributes ma = new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
sid = new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true, ma);
}
success = true;
}
catch (ConnectException e) {
if (!quitting()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_CONNECTION_WAS_REFUSED, this));
}
}
catch (SSLException ex) {
if (!quitting()) {
getSecurityLogger().warning(
LocalizedStrings.CacheClientUpdater_0_SSL_NEGOTIATION_FAILED_1,
new Object[] { this, ex});
throw new AuthenticationFailedException(
LocalizedStrings.CacheClientUpdater_SSL_NEGOTIATION_FAILED_WITH_ENDPOINT_0
.toLocalizedString(location), ex);
}
}
catch (GemFireSecurityException ex) {
if (!quitting()) {
getSecurityLogger().warning(
LocalizedStrings.CacheClientUpdater_0_SECURITY_EXCEPTION_WHEN_CREATING_SERVERTOCLIENT_COMMUNICATION_SOCKET_1,
new Object[] {this, ex});
throw ex;
}
}
catch (IOException e) {
if (!quitting()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_CAUGHT_FOLLOWING_EXECPTION_WHILE_ATTEMPTING_TO_CREATE_A_SERVER_TO_CLIENT_COMMUNICATION_SOCKET_AND_WILL_EXIT_1, new Object[] {this, e}), logger.isDebugEnabled() ? e : null);
}
eManager.serverCrashed(this.endpoint);
}
catch (ClassNotFoundException e) {
if (!quitting()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_CLASS_NOT_FOUND, e.getMessage()));
}
}
finally {
connected = success;
if (mySock != null) {
try {
mySock.setSoTimeout(0);
} catch (SocketException e) {
// ignore: nothing we can do about this
}
}
if (connected) {
socket = mySock;
out = tmpOut;
in = tmpIn;
serverId = sid;
commBuffer = cb;
// Don't want the timeout after handshake
if (mySock != null) {
try {
mySock.setSoTimeout(0);
} catch (SocketException ignore) {
}
}
}
else {
socket = null;
serverId = null;
commBuffer = null;
out = null;
in = null;
if (mySock != null) {
try {
mySock.close();
}
catch (IOException ioe) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_CLOSING_SOCKET_IN_0_FAILED, this), ioe);
}
}
}
}
}
private void releaseCommBuffer() {
if (!this.commBufferReleased) {
if (this.commBuffer != null) {
synchronized (this.commBuffer) {
if (!this.commBufferReleased) {
this.commBufferReleased = true;
ServerConnection.releaseCommBuffer(this.commBuffer);
}
}
}
}
}
public boolean isConnected() {
return connected;
}
public boolean isPrimary() {
return isPrimary;
}
public InternalLogWriter getSecurityLogger() {
return this.qManager.getSecurityLogger();
}
public void setFailedUpdater(ClientUpdater failedUpdater) {
this.failedUpdater = failedUpdater;
}
/**
* Performs the work of the client update thread. Creates a
* <code>ServerSocket</code> and waits for the server to connect to it.
*/
@Override
public void run() {
boolean addedListener = false;
EntryLogger.setSource(serverId, "RI");
try {
this.system.addDisconnectListener(this);
addedListener = true;
if (!waitForCache()) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_NO_CACHE_EXITING, this));
return;
}
processMessages();
}
catch (CancelException e) {
return; // just bail
}
finally {
if (addedListener) {
this.system.removeDisconnectListener(this);
}
this.close();
EntryLogger.clearSource();
}
}
// /**
// * Waits for this thread to be initialized
// *
// * @return true if initialized; false if stopped before init
// */
// public boolean waitForInitialization() {
// boolean result = false;
// // Yogesh : waiting on this thread object is a bad idea
// // as when thread exits it notifies to the waiting threads.
// synchronized (this) {
// for (;;) {
// if (quitting()) {
// break;
// }
// boolean interrupted = Thread.interrupted();
// try {
// this.wait(100); // spurious wakeup ok // timed wait, should fix lost notification problem rahul.
// }
// catch (InterruptedException e) {
// interrupted = true;
// }
// finally {
// if (interrupted) {
// Thread.currentThread().interrupt();
// }
// }
// } // while
// // Even if we succeed, there is a risk that we were shut down
// // Can't check for cache; it isn't set yet :-(
// this.system.getCancelCriterion().checkCancelInProgress(null);
// result = this.continueProcessing;
// } // synchronized
// return result;
// }
// /**
// * @see #waitForInitialization()
// */
// private void notifyInitializationComplete() {
// synchronized (this) {
// this.initialized = true;
// this.notifyAll();
// }
// }
/**
* Notifies this thread to stop processing
*/
protected void stopProcessing() {
continueProcessing.set(false);// = false;
}
/**
* Stops the updater. It will wait for a while for the thread to finish to try
* to prevent duplicates. Note: this method is not named stop because this is
* a Thread which has a deprecated stop method.
*/
public void stopUpdater() {
boolean isSelfDestroying = Thread.currentThread() == this;
stopProcessing();
// need to also close the socket for this interrupt to wakeup
// the thread. This fixes bug 35691.
// this.close(); // this should not be done here.
if (this.isAlive()) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Stopping {}" , this.location, this);
}
if (!isSelfDestroying) {
interrupt();
try {
if (socket != null) {
socket.close();
}
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
// dont care...
if (logger.isDebugEnabled()) {
logger.debug(t.getMessage(), t);
}
}
} // !isSelfDestroying
} // isAlive
}
/**
* Signals the run thread to stop, closes underlying resources.
*/
public void close() {
this.continueProcessing.set(false);// = false; // signals we are done.
// Close the socket
// This will also cause the underlying streams to fail.
try {
if (socket != null) {
socket.close();
}
} catch (Exception e) {
// ignore
}
try {
this.stats.close();
} catch (Exception e) {
// ignore
}
// close the helper
try {
if (cacheHelper != null) {
cacheHelper.close();
}
} catch (Exception e) {
// ignore
}
releaseCommBuffer();
}
/**
* Creates a cached {@link Message}object whose state is filled in with a
* message received from the server.
*/
private Message initializeMessage() {
Message _message = new Message(2, Version.CURRENT);
try {
_message.setComms(socket, in, out, commBuffer, this.stats);
}
catch (IOException e) {
if (!quitting()) {
if (logger.isDebugEnabled()){
logger.debug("{}: Caught following exception while attempting to initialize a server-to-client communication socket and will exit", this, e);
}
stopProcessing();
}
}
return _message;
}
/* refinement of method inherited from Thread */
@Override
public String toString() {
return this.getName() + " (" + this.location.getHostName() + ":"
+ this.location.getPort() + ")";
}
/**
* Handle a marker message
*
* @param m
* message containing the data
*/
private void handleMarker(Message m) {
try {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Received marker message of length ({} bytes)", m.getPayloadLength());
}
this.qManager.getState().processMarker();
if (isDebugEnabled) {
logger.debug("Processed marker message");
}
} catch (Exception e) {
String message =
LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_HANDLE_A_MARKER.toLocalizedString();
handleException(message, e);
}
}
/**
* Create or update an entry
*
* @param m
* message containing the data
*/
private void handleUpdate(Message m) {
String regionName = null;
Object key = null;
Part valuePart = null;
Object newValue = null;
byte[] deltaBytes = null;
Object fullValue = null;
boolean isValueObject = false;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
this.isOpCompleted = false;
// Retrieve the data from the put message parts
if (isDebugEnabled) {
logger.debug("Received put message of length ({} bytes)", m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
Part keyPart = m.getPart(partCnt++);
boolean isDeltaSent = ((Boolean)m.getPart(partCnt++).getObject())
.booleanValue();
valuePart = m.getPart(partCnt++);
Part callbackArgumentPart = m.getPart(partCnt++);
VersionTag versionTag = (VersionTag)m.getPart(partCnt++).getObject();
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
}
Part isInterestListPassedPart = m.getPart(partCnt++);
Part hasCqsPart = m.getPart(partCnt++);
EventID eventId = (EventID)m.getPart(m.getNumberOfParts() - 1)
.getObject();
boolean withInterest = ((Boolean)isInterestListPassedPart.getObject()).booleanValue();
boolean withCQs = ((Boolean)hasCqsPart.getObject()).booleanValue();
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
Object callbackArgument = callbackArgumentPart.getObject();
// Don't automatically deserialize the value.
// Pass it onto the region as a byte[]. If it is a serialized
// object, it will be stored as a CachedDeserializable and
// deserialized only when requested.
boolean isCreate = (m.getMessageType() == MessageType.LOCAL_CREATE);
if (isDebugEnabled) {
logger.debug("Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
regionName, key, isCreate, (valuePart.isObject() ? new StringBuilder(" value: ").append(deserialize(valuePart.getSerializedForm())) : ""),
callbackArgument, withInterest, withCQs, eventId, versionTag);
}
LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
if (!isDeltaSent) {
// bug #42162 - must check for a serialized null here
byte[] serializedForm = valuePart.getSerializedForm();
if (isCreate && InternalDataSerializer.isSerializedNull(serializedForm)) {
//newValue = null; newValue is already null
} else {
newValue = valuePart.getSerializedForm();
}
if (withCQs) {
fullValue = valuePart.getObject();
}
isValueObject = valuePart.isObject();
} else {
deltaBytes = valuePart.getSerializedForm();
isValueObject = true;
}
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("{}: Region named {} does not exist", this, regionName);
}
}
else if (region.hasServerProxy()
&& ServerResponseMatrix.checkForValidStateAfterNotification(region,
key, m.getMessageType()) && (withInterest || !withCQs)) {
EntryEventImpl newEvent = null;
try {
// Create an event and put the entry
newEvent = EntryEventImpl.create(
region,
((m.getMessageType() == MessageType.LOCAL_CREATE) ? Operation.CREATE
: Operation.UPDATE), key, null /* newValue */,
callbackArgument /* callbackArg */, true /* originRemote */,
eventId.getDistributedMember());
newEvent.setVersionTag(versionTag);
newEvent.setFromServer(true);
region.basicBridgeClientUpdate(eventId.getDistributedMember(), key,
newValue, deltaBytes, isValueObject, callbackArgument, m
.getMessageType() == MessageType.LOCAL_CREATE, qManager
.getState().getProcessedMarker()
|| !this.isDurableClient, newEvent, eventId);
this.isOpCompleted = true;
// bug 45520 - ConcurrentCacheModificationException is not thrown and we must check this flag
// if (newEvent.isConcurrencyConflict()) {
// return; // this is logged elsewhere at fine level
// }
if (withCQs && isDeltaSent) {
fullValue = newEvent.getNewValue();
}
} catch (InvalidDeltaException ide) {
Part fullValuePart = requestFullValue(eventId,
"Caught InvalidDeltaException.");
region.getCachePerfStats().incDeltaFullValuesRequested();
fullValue = newValue = fullValuePart.getObject();
isValueObject = Boolean.valueOf(fullValuePart.isObject());
region.basicBridgeClientUpdate(eventId.getDistributedMember(), key,
newValue, null, isValueObject, callbackArgument, m
.getMessageType() == MessageType.LOCAL_CREATE, qManager
.getState().getProcessedMarker()
|| !this.isDurableClient, newEvent, eventId);
this.isOpCompleted = true;
} finally {
if (newEvent != null) newEvent.release();
}
if (isDebugEnabled) {
logger.debug("Put entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument);
}
}
// Update CQs. CQs can exist without client region.
if (withCQs) {
Part numCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt()/2);
}
partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m
.getMessageType(), key, fullValue, deltaBytes, eventId);
this.isOpCompleted = true;
}
} catch (Exception e) {
String message = LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_PUT_ENTRY_REGION_0_KEY_1_VALUE_2.toLocalizedString(new Object[] { regionName, key, deserialize(valuePart.getSerializedForm())});
handleException(message, e);
}
}
private Part requestFullValue(EventID eventId, String reason) throws Exception {
if (isUsedByTest) {
fullValueRequested = true;
}
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("{} Requesting full value...", reason);
}
Part result = (Part)GetEventValueOp.executeOnPrimary(qManager.getPool(),
eventId, null);
if (result == null) {
// Just log a warning. Do not stop CCU thread.
throw new Exception("Could not retrieve full value for "
+ eventId);
}
if (isDebugEnabled) {
logger.debug("Full value received.");
}
return result;
}
/**
* Invalidate an entry
*
* @param m
* message describing the entry
*/
private void handleInvalidate(Message m) {
String regionName = null;
Object key = null;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
this.isOpCompleted = false;
// Retrieve the data from the local-invalidate message parts
if (isDebugEnabled) {
logger.debug("Received invalidate message of length ({} bytes)", m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
Part keyPart = m.getPart(partCnt++);
Part callbackArgumentPart = m.getPart(partCnt++);
VersionTag versionTag = (VersionTag)m.getPart(partCnt++).getObject();
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
}
Part isInterestListPassedPart = m.getPart(partCnt++);
Part hasCqsPart = m.getPart(partCnt++);
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
Object callbackArgument = callbackArgumentPart.getObject();
boolean withInterest = ((Boolean)isInterestListPassedPart.getObject()).booleanValue();
boolean withCQs = ((Boolean)hasCqsPart.getObject()).booleanValue();
if (isDebugEnabled) {
logger.debug("Invalidating entry for region: {} key: {} callbackArgument: {} withInterest={} withCQs={} version={}",
regionName, key, callbackArgument, withInterest, withCQs, versionTag);
}
LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
} else {
if (region.hasServerProxy()
&& (withInterest || !withCQs)) {
try {
Part eid = m.getPart(m.getNumberOfParts() - 1);
EventID eventId = (EventID)eid.getObject();
try {
region.basicBridgeClientInvalidate(eventId.getDistributedMember(), key,
callbackArgument, qManager.getState().getProcessedMarker() || !this.isDurableClient,
eventId, versionTag);
} catch (ConcurrentCacheModificationException e) {
// return; allow CQs to be processed
}
this.isOpCompleted = true;
//fix for 36615
qManager.getState().incrementInvalidatedStats();
if (isDebugEnabled) {
logger.debug("Invalidated entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument);
}
}
catch (EntryNotFoundException e) {
/*ignore*/
if (isDebugEnabled && !quitting()) {
logger.debug("Already invalidated entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument);
}
this.isOpCompleted = true;
}
}
}
if (withCQs) {
// The client may have been registered to receive invalidates for
// create and updates operations. Get the actual region operation.
Part regionOpType = m.getPart(partCnt++);
Part numCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2);
}
partCnt = processCqs(m, partCnt, numCqsPart.getInt(), regionOpType.getInt(), key, null);
this.isOpCompleted = true;
}
}
catch (Exception e) {
final String message = LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_INVALIDATE_ENTRY_REGION_0_KEY_1.toLocalizedString(new Object[] { regionName, key });
handleException(message, e);
}
}
/**
* locally destroy an entry
*
* @param m
* message describing the entry
*/
private void handleDestroy(Message m) {
String regionName = null;
Object key = null;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
this.isOpCompleted = false;
// Retrieve the data from the local-destroy message parts
if (isDebugEnabled) {
logger.debug("Received destroy message of length ({} bytes)", m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
Part keyPart = m.getPart(partCnt++);
Part callbackArgumentPart = m.getPart(partCnt++);
VersionTag versionTag = (VersionTag)m.getPart(partCnt++).getObject();
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
}
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
Part isInterestListPassedPart = m.getPart(partCnt++);
Part hasCqsPart = m.getPart(partCnt++);
boolean withInterest = ((Boolean)isInterestListPassedPart.getObject()).booleanValue();
boolean withCQs = ((Boolean)hasCqsPart.getObject()).booleanValue();
Object callbackArgument = callbackArgumentPart.getObject();
if (isDebugEnabled) {
logger.debug("Destroying entry for region: {} key: {} callbackArgument: {} withInterest={} withCQs={} version={}",
regionName, key, callbackArgument, withInterest, withCQs, versionTag);
}
LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
EventID eventId = null;
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
}
else if (region.hasServerProxy()
&& (withInterest || !withCQs)) {
try {
Part eid = m.getPart(m.getNumberOfParts() - 1);
eventId = (EventID)eid.getObject();
try {
region.basicBridgeClientDestroy(eventId.getDistributedMember(),
key, callbackArgument,
qManager.getState().getProcessedMarker() || !this.isDurableClient,
eventId, versionTag);
} catch (ConcurrentCacheModificationException e) {
// return; allow CQs to be processed
}
this.isOpCompleted = true;
if (isDebugEnabled) {
logger.debug("Destroyed entry for region: {} key: {} callbackArgument: {}", regionName, key, callbackArgument);
}
}
catch (EntryNotFoundException e) {
/*ignore*/
if (isDebugEnabled && !quitting()) {
logger.debug("Already destroyed entry for region: {} key: {} callbackArgument: {} eventId={}", regionName, key, callbackArgument, eventId.expensiveToString());
}
this.isOpCompleted = true;
}
}
if (withCQs) {
Part numCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2);
}
partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, null);
this.isOpCompleted = true;
}
}
catch (Exception e) {
String message = LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_DESTROY_ENTRY_REGION_0_KEY_1.toLocalizedString(new Object[] { regionName, key });
handleException(message, e);
}
}
/**
* Locally destroy a region
*
* @param m
* message describing the region
*/
private void handleDestroyRegion(Message m) {
Part regionNamePart = null, callbackArgumentPart = null;
String regionName = null;
Object callbackArgument = null;
LocalRegion region = null;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
// Retrieve the data from the local-destroy-region message parts
if (isDebugEnabled) {
logger.debug("Received destroy region message of length ({} bytes)", m.getPayloadLength());
}
regionNamePart = m.getPart(partCnt++);
callbackArgumentPart = m.getPart(partCnt++);
regionName = regionNamePart.getString();
callbackArgument = callbackArgumentPart.getObject();
Part hasCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Destroying region: {} callbackArgument: {}", regionName, callbackArgument);
}
// Handle CQs if any on this region.
if (((Boolean)hasCqsPart.getObject()).booleanValue()) {
Part numCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2);
}
partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
}
// Confirm that the region exists
region = (LocalRegion) cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
return;
}
// Verify that the region in question should respond to this message
if (region.hasServerProxy()) {
// Locally destroy the region
region.localDestroyRegion(callbackArgument);
if (isDebugEnabled) {
logger.debug("Destroyed region: {} callbackArgument: {}", regionName, callbackArgument);
}
}
} catch (RegionDestroyedException e) { // already destroyed
if (isDebugEnabled) {
logger.debug("region already destroyed: {}", regionName);
}
} catch (Exception e) {
String message = LocalizedStrings.CacheClientUpdater_CAUGHT_AN_EXCEPTION_WHILE_ATTEMPTING_TO_DESTROY_REGION_0.toLocalizedString(regionName);
handleException(message, e);
}
}
/**
* Locally clear a region
*
* @param m
* message describing the region to clear
*/
private void handleClearRegion(Message m) {
String regionName = null;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
// Retrieve the data from the clear-region message parts
if (isDebugEnabled) {
logger.debug("{}: Received clear region message of length ({} bytes)", this, m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
Part callbackArgumentPart = m.getPart(partCnt++);
Part hasCqsPart = m.getPart(partCnt++);
regionName = regionNamePart.getString();
Object callbackArgument = callbackArgumentPart.getObject();
if (isDebugEnabled) {
logger.debug("Clearing region: {} callbackArgument: {}", regionName, callbackArgument);
}
if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
Part numCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2);
}
partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m
.getMessageType(), null, null);
}
// Confirm that the region exists
LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
return;
}
// Verify that the region in question should respond to this
// message
if (region.hasServerProxy()) {
// Locally clear the region
region.basicBridgeClientClear(callbackArgument, qManager.getState()
.getProcessedMarker() || !this.isDurableClient);
if (isDebugEnabled) {
logger.debug("Cleared region: {} callbackArgument: {}", regionName, callbackArgument);
}
}
}
catch (Exception e) {
String message = LocalizedStrings.CacheClientUpdater_CAUGHT_THE_FOLLOWING_EXCEPTION_WHILE_ATTEMPTING_TO_CLEAR_REGION_0.toLocalizedString(regionName);
handleException(message, e);
}
}
/**
* Locally invalidate a region
* NOTE: Added as part of bug#38048. The code only takes care of CQ processing.
* Support needs to be added for local region invalidate.
* @param m message describing the region to clear
*/
private void handleInvalidateRegion(Message m) {
String regionName = null;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
// Retrieve the data from the invalidate-region message parts
if (isDebugEnabled) {
logger.debug("{}: Received invalidate region message of length ({} bytes)", this, m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
partCnt ++; // Part callbackArgumentPart = m.getPart(partCnt++);
Part hasCqsPart = m.getPart(partCnt++);
regionName = regionNamePart.getString();
// Object callbackArgument = callbackArgumentPart.getObject();
if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
Part numCqsPart = m.getPart(partCnt++);
if (isDebugEnabled) {
logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}", numCqsPart.getInt() / 2);
}
partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
}
// Confirm that the region exists
LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("Region named {} does not exist", regionName);
}
return;
}
// Verify that the region in question should respond to this
// message
if (region.hasServerProxy()){
return;
// NOTE:
// As explained in the method description, this code is added as part
// of CQ bug fix. Cache server team needs to look into changes relating
// to local region.
//
// Locally invalidate the region
// region.basicBridgeClientInvalidate(callbackArgument,
// proxy.getProcessedMarker());
//if (logger.debugEnabled()) {
// logger.debug(toString() + ": Cleared region: " + regionName
// + " callbackArgument: " + callbackArgument);
//}
}
}
catch (Exception e) {
String message =
LocalizedStrings.CacheClientUpdater_CAUGHT_THE_FOLLOWING_EXCEPTION_WHILE_ATTEMPTING_TO_INVALIDATE_REGION_0.toLocalizedString(regionName);
handleException(message, e);
}
}
/**
* Register instantiators locally
*
* @param msg
* message describing the new instantiators
* @param eventId
* eventId of the instantiators
*/
private void handleRegisterInstantiator(Message msg, EventID eventId) {
String instantiatorClassName = null;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
int noOfParts = msg.getNumberOfParts();
if (isDebugEnabled) {
logger.debug("{}: Received register instantiators message of parts {}", getName(), noOfParts);
}
Assert.assertTrue((noOfParts - 1) % 3 == 0);
for (int i = 0; i < noOfParts - 1; i = i + 3) {
instantiatorClassName = (String) CacheServerHelper
.deserialize(msg.getPart(i).getSerializedForm());
String instantiatedClassName = (String) CacheServerHelper
.deserialize(msg.getPart(i + 1).getSerializedForm());
int id = msg.getPart(i + 2).getInt();
InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id,
false, eventId, null/* context */);
// distribute is false because we don't want to propagate this to
// servers recursively
}
// // CALLBACK TESTING PURPOSE ONLY ////
if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterReceivingFromServer(eventId);
}
// /////////////////////////////////////
}
// TODO bug: can the following catch be more specific?
catch (Exception e) {
if (isDebugEnabled) {
logger.debug("{}: Caught following exception while attempting to read Instantiator : {}", this, instantiatorClassName, e);
}
}
}
private void handleRegisterDataSerializer(Message msg, EventID eventId) {
Class dataSerializerClass = null ;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
int noOfParts = msg.getNumberOfParts();
// int numOfClasses = noOfParts - 3; // 1 for ds classname, 1 for ds id and 1 for eventId.
if (isDebugEnabled) {
logger.debug("{}: Received register dataserializer message of parts {}", getName(), noOfParts);
}
for (int i = 0; i < noOfParts - 1;) {
try {
String dataSerializerClassName = (String) CacheServerHelper
.deserialize(msg.getPart(i).getSerializedForm());
int id = msg.getPart(i + 1).getInt();
InternalDataSerializer.register(dataSerializerClassName, false, eventId, null/* context */, id);
// distribute is false because we don't want to propagate this to
// servers recursively
int numOfClasses = msg.getPart(i + 2).getInt();
int j = 0;
for (; j < numOfClasses; j++) {
String className = (String)CacheServerHelper.deserialize(msg
.getPart(i + 3 + j).getSerializedForm());
InternalDataSerializer.updateSupportedClassesMap(
dataSerializerClassName, className);
}
i = i + 3 + j;
} catch (ClassNotFoundException e) {
if (isDebugEnabled) {
logger.debug("{}: Caught following exception while attempting to read DataSerializer : {}", this, dataSerializerClass, e);
}
}
}
// // CALLBACK TESTING PURPOSE ONLY ////
if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.afterReceivingFromServer(eventId);
}
///////////////////////////////////////
}
// TODO bug: can the following catch be more specific?
catch (Exception e) {
if (isDebugEnabled) {
logger.debug("{}: Caught following exception while attempting to read DataSerializer : {}", this, dataSerializerClass, e);
}
}
}
/**
* Processes message to invoke CQ listeners.
*
* @param startMessagePart
* @param numCqParts
* @param messageType
* @param key
* @param value
*/
private int processCqs(Message m, int startMessagePart, int numCqParts,
int messageType, Object key, Object value) {
return processCqs(m, startMessagePart, numCqParts, messageType, key, value,
null, null/* eventId */);
}
private int processCqs(Message m, int startMessagePart, int numCqParts,
int messageType, Object key, Object value, byte[] delta, EventID eventId) {
//String[] cqs = new String[numCqs/2];
HashMap cqs = new HashMap();
final boolean isDebugEnabled = logger.isDebugEnabled();
for (int cqCnt=0; cqCnt < numCqParts;) {
StringBuilder str = null;
if (isDebugEnabled) {
str = new StringBuilder(100);
str.append("found these queries: ");
}
try {
// Get CQ Name.
Part cqNamePart = m.getPart(startMessagePart + (cqCnt++));
// Get CQ Op.
Part cqOpPart = m.getPart(startMessagePart + (cqCnt++));
cqs.put(cqNamePart.getString(), Integer.valueOf(cqOpPart.getInt()));
if (str != null) {
str.append(cqNamePart.getString())
.append(" op=").append(cqOpPart.getInt()).append(" ");
}
} catch (Exception ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_ERROR_WHILE_PROCESSING_THE_CQ_MESSAGE_PROBLEM_WITH_READING_MESSAGE_FOR_CQ_0, cqCnt));
}
if (isDebugEnabled && str != null) {
logger.debug(str);
}
}
{
CqService cqService = this.cache.getCqService();
try {
cqService.dispatchCqListeners(cqs, messageType, key, value, delta,
qManager, eventId);
}
catch (Exception ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0, ex.getMessage()));
if (isDebugEnabled) {
logger.debug("Failed to invoke CQ Dispatcher.", ex);
}
}
}
return (startMessagePart + numCqParts);
}
private void handleRegisterInterest(Message m) {
String regionName = null;
Object key = null;
int interestType;
byte interestResultPolicy;
boolean isDurable;
boolean receiveUpdatesAsInvalidates;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
// Retrieve the data from the add interest message parts
if (isDebugEnabled) {
logger.debug("{}: Received add interest message of length ({} bytes)", this, m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
Part keyPart = m.getPart(partCnt++);
Part interestTypePart = m.getPart(partCnt++);
Part interestResultPolicyPart = m.getPart(partCnt++);
Part isDurablePart = m.getPart(partCnt++);
Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
interestType = ((Integer) interestTypePart.getObject()).intValue();
interestResultPolicy = ((Byte) interestResultPolicyPart.getObject()).byteValue();
isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
receiveUpdatesAsInvalidates = ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
// Confirm that region exists
LocalRegion region = (LocalRegion)cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled && !quitting()) {
logger.debug("{}: Region named {} does not exist", this, regionName);
}
return;
}
// Verify that the region in question should respond to this message
if (!region.hasServerProxy()) {
return;
}
if (key instanceof List) {
region.getServerProxy().addListInterest((List) key,
InterestResultPolicy.fromOrdinal(interestResultPolicy), isDurable,
receiveUpdatesAsInvalidates);
} else {
region.getServerProxy().addSingleInterest(key, interestType,
InterestResultPolicy.fromOrdinal(interestResultPolicy), isDurable,
receiveUpdatesAsInvalidates);
}
}
catch (Exception e) {
String message = ": The following exception occurred while attempting to add interest (region: " + regionName
+ " key: " + key + "): ";
handleException(message, e);
}
}
private void handleUnregisterInterest(Message m) {
String regionName = null;
Object key = null;
int interestType;
boolean isDurable;
boolean receiveUpdatesAsInvalidates;
int partCnt = 0;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
// Retrieve the data from the remove interest message parts
if (isDebugEnabled) {
logger.debug("{}: Received remove interest message of length ({} bytes)", this, m.getPayloadLength());
}
Part regionNamePart = m.getPart(partCnt++);
Part keyPart = m.getPart(partCnt++);
Part interestTypePart = m.getPart(partCnt++);
Part isDurablePart = m.getPart(partCnt++);
Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
// Not reading the eventId part
regionName = regionNamePart.getString();
key = keyPart.getStringOrObject();
interestType = ((Integer) interestTypePart.getObject()).intValue();
isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
receiveUpdatesAsInvalidates =
((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
// Confirm that region exists
LocalRegion region = (LocalRegion)cacheHelper.getRegion(regionName);
if (region == null) {
if (isDebugEnabled) {
logger.debug("{}: Region named {} does not exist", this, regionName);
}
return;
}
// Verify that the region in question should respond to this message
if (!region.hasServerProxy()) {
return;
}
if (key instanceof List) {
region.getServerProxy().removeListInterest((List) key, isDurable,
receiveUpdatesAsInvalidates);
} else {
region.getServerProxy().removeSingleInterest(key, interestType,
isDurable, receiveUpdatesAsInvalidates);
}
}
catch (Exception e) {
String message = ": The following exception occurred while attempting to add interest (region: " + regionName
+ " key: " + key + "): ";
handleException(message, e);
}
}
private void handleTombstoneOperation(Message msg) {
String regionName = "unknown";
try { // not sure why this isn't done by the caller
int partIdx = 0;
// see ClientTombstoneMessage.getGFE70Message
regionName = msg.getPart(partIdx++).getString();
int op = msg.getPart(partIdx++).getInt();
LocalRegion region = (LocalRegion)cacheHelper.getRegion(regionName);
if (region == null) {
if (!quitting()) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Region named {} does not exist", this, regionName);
}
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received tombstone operation for region {} with operation={}", this, region, op);
}
if (!region.getConcurrencyChecksEnabled()) {
return;
}
switch (op) {
case 0:
Map<VersionSource, Long> regionGCVersions =
(Map<VersionSource, Long>)msg.getPart(partIdx++).getObject();
EventID eventID = (EventID)msg.getPart(partIdx++).getObject();
region.expireTombstones(regionGCVersions, eventID, null);
break;
case 1:
Set<Object> removedKeys = (Set<Object>)msg.getPart(partIdx++).getObject();
region.expireTombstoneKeys(removedKeys);
break;
default:
throw new IllegalArgumentException("unknown operation type " + op);
}
} catch (Exception e) {
handleException(": exception while removing tombstones from " + regionName, e);
}
}
/**
* Indicate whether the updater or the system is trying to terminate
*
* @return true if we are trying to stop
*/
private boolean quitting() {
if (isInterrupted()) {
// Any time an interrupt is thrown at this thread, regard it as a
// request to terminate
return true;
}
if (!continueProcessing.get()) {
// de facto flag indicating we are to stop
return true;
}
if (cache != null && cache.getCancelCriterion().cancelInProgress() != null) {
// System is cancelling
return true;
}
// The pool stuff is really sick, so it's possible for us to have a distributed
// system that is not the same as our cache. Check it just in case...
if (system.getCancelCriterion().cancelInProgress() != null) {
return true;
}
// All clear on this end, boss.
return false;
}
private void waitForFailedUpdater() {
boolean gotInterrupted = false;
try {
if (this.failedUpdater != null) {
logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater__0_IS_WAITING_FOR_1_TO_COMPLETE, new Object[] {this, this.failedUpdater}));
while (this.failedUpdater.isAlive()){
if (quitting()) {
return;
}
this.failedUpdater.join(5000);
}
}
}
catch (InterruptedException ie) {
gotInterrupted = true;
return; // just bail, because I have not done anything yet
}
finally {
if (!gotInterrupted && this.failedUpdater != null ) {
logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_HAS_COMPLETED_WAITING_FOR_1, new Object[] {this, this.failedUpdater}));
failedUpdater = null;
}
}
}
/**
* Processes messages received from the server.
*
* Only certain types of messages are handled.
*
* @see MessageType#CLIENT_MARKER
* @see MessageType#LOCAL_CREATE
* @see MessageType#LOCAL_UPDATE
* @see MessageType#LOCAL_INVALIDATE
* @see MessageType#LOCAL_DESTROY
* @see MessageType#LOCAL_DESTROY_REGION
* @see MessageType#CLEAR_REGION
* @see ClientUpdateMessage
*/
protected void processMessages() {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
Part eid = null;
Message _message = initializeMessage();
if (quitting()) {
if (isDebugEnabled) {
logger.debug("processMessages quitting early because we have stopped");
}
// our caller calls close which will notify all waiters for our init
return;
}
logger.info(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_READY_TO_PROCESS_MESSAGES, this));
while (continueProcessing.get()) {
// SystemFailure.checkFailure(); dm will check this
if (quitting()) {
if (isDebugEnabled) {
logger.debug("termination detected");
}
// our caller calls close which will notify all waiters for our init
return;
}
// the endpoint died while this thread was sleeping.
if (this.endpoint.isClosed()) {
if (isDebugEnabled) {
logger.debug("endpoint died");
}
this.continueProcessing.set(false);// = false;
break;
}
try {
// Read the message
_message.recv();
// Wait for the previously failed cache client updater
// to finish. This will avoid out of order messages.
waitForFailedUpdater();
cache.waitForRegisterInterestsInProgress();
if (quitting()) {
if (isDebugEnabled) {
logger.debug("processMessages quitting before processing message");
}
break;
}
// If the message is a ping, ignore it
if (_message.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
if (isDebugEnabled) {
logger.debug("{}: Received ping", this);
}
continue;
}
boolean isDeltaSent = false;
boolean isCreateOrUpdate = _message.getMessageType() == MessageType.LOCAL_CREATE
|| _message.getMessageType() == MessageType.LOCAL_UPDATE;
if (isCreateOrUpdate) {
isDeltaSent = ((Boolean)_message.getPart(2).getObject())
.booleanValue();
}
// extract the eventId and verify if it is a duplicate event
// if it is a duplicate event, ignore
// @since 5.1
int numberOfParts = _message.getNumberOfParts();
eid = _message.getPart(numberOfParts - 1);
// TODO the message handling methods also deserialized the eventID - inefficient
EventID eventId = (EventID)eid.getObject();
// no need to verify if the instantiator msg is duplicate or not
if (_message.getMessageType() != MessageType.REGISTER_INSTANTIATORS && _message.getMessageType() != MessageType.REGISTER_DATASERIALIZERS ) {
if (this.qManager.getState().verifyIfDuplicate(eventId, !(this.isDurableClient || isDeltaSent))) {
continue;
}
}
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
logger.trace(LogMarker.BRIDGE_SERVER, "Processing event with id {}" + eventId.expensiveToString());
}
this.isOpCompleted = true;
// Process the message
switch (_message.getMessageType()) {
case MessageType.LOCAL_CREATE:
case MessageType.LOCAL_UPDATE:
handleUpdate(_message);
break;
case MessageType.LOCAL_INVALIDATE:
handleInvalidate(_message);
break;
case MessageType.LOCAL_DESTROY:
handleDestroy(_message);
break;
case MessageType.LOCAL_DESTROY_REGION:
handleDestroyRegion(_message);
break;
case MessageType.CLEAR_REGION:
handleClearRegion(_message);
break;
case MessageType.REGISTER_INSTANTIATORS:
handleRegisterInstantiator(_message, eventId);
break;
case MessageType.REGISTER_DATASERIALIZERS:
handleRegisterDataSerializer(_message, eventId);
break;
case MessageType.CLIENT_MARKER:
handleMarker(_message);
break;
case MessageType.INVALIDATE_REGION:
handleInvalidateRegion(_message);
break;
case MessageType.CLIENT_REGISTER_INTEREST:
handleRegisterInterest(_message);
break;
case MessageType.CLIENT_UNREGISTER_INTEREST:
handleUnregisterInterest(_message);
break;
case MessageType.TOMBSTONE_OPERATION:
handleTombstoneOperation(_message);
break;
default:
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0_RECEIVED_AN_UNSUPPORTED_MESSAGE_TYPE_1, new Object[] {this, MessageType.getString(_message.getMessageType())}));
break;
}
if (this.isOpCompleted && (this.isDurableClient || isDeltaSent)) {
this.qManager.getState().verifyIfDuplicate(eventId, true);
}
// TODO we should maintain the client's "live" view of the server
// but we don't because the server health monitor needs traffic
// originating from the client
// and by updating the last update stat, the ServerMonitor is less
// likely to send pings...
// and the ClientHealthMonitor will cause a disconnect -- mthomas
// 10/18/2006
// this._endpoint.setLastUpdate();
}
catch (InterruptedIOException e) {
// Per Sun's support web site, this exception seems to be peculiar
// to Solaris, and may eventually not even be generated there.
//
// When this exception is thrown, the thread has been interrupted, but
// isInterrupted() is false. (How very odd!)
//
// We regard it the same as an InterruptedException
this.endPointDied = true;
continueProcessing.set(false);// = false;
if (isDebugEnabled) {
logger.debug("InterruptedIOException");
}
}
catch (IOException e) {
this.endPointDied = true;
// Either the server went away, or we caught a closing condition.
if (!quitting()) {
// Server departed; print a message.
String message = ": Caught the following exception and will exit: ";
String errMessage = e.getMessage();
if (errMessage == null) {
errMessage = "";
}
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.beforeFailoverByCacheClientUpdater(this.location);
eManager.serverCrashed(this.endpoint);
if (isDebugEnabled) {
logger.debug("" + message + e);
}
} // !quitting
// In any event, terminate this thread.
continueProcessing.set(false);// = false;
if (isDebugEnabled) {
logger.debug("terminated due to IOException");
}
}
catch (Exception e) {
if (!quitting()) {
this.endPointDied = true;
BridgeObserver bo = BridgeObserverHolder.getInstance();
bo.beforeFailoverByCacheClientUpdater(this.location);
eManager.serverCrashed(this.endpoint);
String message = ": Caught the following exception and will exit: ";
handleException(message, e);
}
// In any event, terminate this thread.
continueProcessing.set(false);// = false; // force termination
if (isDebugEnabled) {
logger.debug("CCU terminated due to Exception");
}
}
finally {
_message.clear();
}
} // while
}
finally {
if (isDebugEnabled) {
logger.debug("has stopped and cleaning the helper ..");
}
this.close(); // added to fixes some race conditions associated with 38382
//this will make sure that if this thread dies without starting QueueMgr then it will start..
//1. above we ignore InterruptedIOException and this thread dies without informing QueueMgr
//2. if there is some other race codition with continueProcessing flag
this.qManager.checkEndpoint(this, endpoint);
}
}
/**
* Conditionally print a warning describing the failure
* <p>
* Signals run thread to stop. Messages are not printed if the thread or the
* distributed system has already been instructed to terminate.
*
* @param message
* contextual string for the failure
* @param exception
* underlying exception
*/
private void handleException(String message, Exception exception) {
boolean unexpected = !quitting();
// If this was a surprise, print a warning.
if (unexpected && !(exception instanceof CancelException)) {
logger.warn(LocalizedMessage.create(LocalizedStrings.CacheClientUpdater_0__1__2, new Object[] {this, message, exception}), exception);
}
// We can't shutdown the client updater just because of an exception.
// Let the caller decide if we should continue running or not.
}
/**
* Return an object from serialization. Only used in debug logging.
*
* @param serializedBytes
* the serialized form
* @return the deserialized object
*/
private Object deserialize(byte[] serializedBytes) {
Object deserializedObject = serializedBytes;
// This is a debugging method so ignore all exceptions like
// ClassNotFoundException
try {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
serializedBytes));
deserializedObject = DataSerializer.readObject(dis);
} catch (Exception e) {
}
return deserializedObject;
}
/**
* @return the local port of our {@link #socket}
*/
protected int getLocalPort() {
return socket.getLocalPort();
}
/*
* (non-Javadoc)
*
* @see com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.DisconnectListener#onDisconnect(com.gemstone.gemfire.distributed.internal.InternalDistributedSystem)
*/
public void onDisconnect(InternalDistributedSystem sys) {
stopUpdater();
}
/**
* true if the EndPoint represented by this updater thread has died.
*/
private volatile boolean endPointDied = false;
/**
* Returns true if the end point represented by this updater is considered dead.
* @return true if {@link #endpoint} died.
*/
public boolean isEndPointDead() {
return this.endPointDied;
}
private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
if (actualBufferSize < requestedBufferSize) {
logger.info(LocalizedMessage.create(
LocalizedStrings.Connection_SOCKET_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
new Object[] { type + " buffer size", actualBufferSize, requestedBufferSize }));
}
}
/**
* Stats for a CacheClientUpdater. Currently the only thing measured
* are incoming bytes on the wire
* @since 5.7
* @author darrel
*/
public static class CCUStats implements MessageStats {
// static fields
private static final StatisticsType type;
private final static int messagesBeingReceivedId;
private final static int messageBytesBeingReceivedId;
private final static int receivedBytesId;
static {
StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
type = f.createType(
"CacheClientUpdaterStats",
"Statistics about incoming subscription data",
new StatisticDescriptor[] {
f.createLongCounter("receivedBytes",
"Total number of bytes received from the server.",
"bytes"),
f.createIntGauge("messagesBeingReceived",
"Current number of message being received off the network or being processed after reception.",
"messages"),
f.createLongGauge("messageBytesBeingReceived",
"Current number of bytes consumed by messages being received or processed.",
"bytes"),
});
receivedBytesId = type.nameToId("receivedBytes");
messagesBeingReceivedId = type.nameToId("messagesBeingReceived");
messageBytesBeingReceivedId = type.nameToId("messageBytesBeingReceived");
}
// instance fields
private final Statistics stats;
public CCUStats(DistributedSystem ids, ServerLocation location) {
// no need for atomic since only a single thread will be writing these
this.stats = ids.createStatistics(type, "CacheClientUpdater-" + location);
}
public void close() {
this.stats.close();
}
public final void incReceivedBytes(long v) {
this.stats.incLong(receivedBytesId, v);
}
public final void incSentBytes(long v) {
// noop since we never send messages
}
public void incMessagesBeingReceived(int bytes) {
stats.incInt(messagesBeingReceivedId, 1);
if (bytes > 0) {
stats.incLong(messageBytesBeingReceivedId, bytes);
}
}
public void decMessagesBeingReceived(int bytes) {
stats.incInt(messagesBeingReceivedId, -1);
if (bytes > 0) {
stats.incLong(messageBytesBeingReceivedId, -bytes);
}
}
/**
* Returns the current time (ns).
* @return the current time (ns)
*/
public long startTime()
{
return DistributionStats.getStatTime();
}
}
public boolean isProcessing() {
// TODO Auto-generated method stub
return continueProcessing.get();
}
}