blob: eab7efc71f593fcd366c60fe21aad60343e106fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.internal;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesMutator;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.CqStatusListener;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.cq.internal.ops.ServerCQProxyImpl;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.cq.ClientCQ;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.GemFireSecurityException;
public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
private static final Logger logger = LogService.getLogger();
private CqAttributes cqAttributes = null;
private volatile ServerCQProxyImpl cqProxy;
private ProxyCache proxyCache = null;
/**
* To queue the CQ Events arriving during CQ execution with initial Results.
*/
private volatile ConcurrentLinkedQueue<CqEventImpl> queuedEvents = null;
final Object queuedEventsSynchObject = new Object();
private boolean connected = false;
public ClientCQImpl(CqServiceImpl cqService, String cqName, String queryString,
CqAttributes cqAttributes, ServerCQProxyImpl serverProxy, boolean isDurable) {
super(cqService, cqName, queryString, isDurable);
this.cqAttributes = cqAttributes;
this.cqProxy = serverProxy;
}
@Override
public String getServerCqName() {
return this.cqName;
}
ServerCQProxyImpl getCQProxy() {
return this.cqProxy;
}
/**
* Initializes the connection using the pool from the client region. Also sets the cqBaseRegion
* value of this CQ.
*/
private void initConnectionProxy() throws CqException, RegionNotFoundException {
cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(regionName);
// Check if the region exists on the local cache.
// In the current implementation of 5.1 the Server Connection is (ConnectionProxyImpl)
// is obtained by the Bridge Client/writer/loader on the local region.
if (cqBaseRegion == null) {
throw new RegionNotFoundException(
String.format("Region on which query is specified not found locally, regionName: %s",
regionName));
}
ServerRegionProxy srp = cqBaseRegion.getServerProxy();
if (srp != null) {
if (logger.isTraceEnabled()) {
logger.trace("Found server region proxy on region. RegionName: {}", regionName);
}
this.cqProxy = new ServerCQProxyImpl(srp);
if (!srp.getPool().getSubscriptionEnabled()) {
throw new CqException("The 'queueEnabled' flag on Pool installed on Region " + regionName
+ " is set to false.");
}
} else {
throw new CqException(
"Unable to get the connection pool. The Region does not have a pool configured.");
}
}
@Override
public void close() throws CqClosedException, CqException {
this.close(true);
}
@Override
public void close(boolean sendRequestToServer) throws CqClosedException, CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Started closing CQ CqName: {} SendRequestToServer: {}", cqName,
sendRequestToServer);
}
// Synchronize with stop and execute CQ commands
synchronized (this.cqState) {
// Check if the cq is already closed.
if (this.isClosed()) {
// throw new CqClosedException("CQ is already closed, CqName : " + this.cqName);
if (isDebugEnabled) {
logger.debug("CQ is already closed, CqName: {}", this.cqName);
}
return;
}
int stateBeforeClosing = this.cqState.getState();
this.cqState.setState(CqStateImpl.CLOSING);
boolean isClosed = false;
// Client Close. Proxy is null in case of server.
// Check if this has been sent to server, if so send
// Close request to server.
Exception exception = null;
if (this.cqProxy != null && sendRequestToServer) {
try {
if (this.proxyCache != null) {
if (this.proxyCache.isClosed()) {
throw proxyCache.getCacheClosedException("Cache is closed for this user.");
}
UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
}
cqProxy.close(this);
isClosed = true;
} catch (CancelException e) {
throw e;
} catch (Exception ex) {
if (shutdownInProgress()) {
return;
}
exception = ex;
} finally {
UserAttributes.userAttributes.set(null);
}
}
// Cleanup the resource used by cq.
this.removeFromCqMap();
if (cqProxy == null || !sendRequestToServer || isClosed) {
// Stat update.
if (stateBeforeClosing == CqStateImpl.RUNNING) {
cqService.stats().decCqsActive();
} else if (stateBeforeClosing == CqStateImpl.STOPPED) {
cqService.stats().decCqsStopped();
}
// Set the state to close, and update stats
this.cqState.setState(CqStateImpl.CLOSED);
cqService.stats().incCqsClosed();
cqService.stats().decCqsOnClient();
if (this.stats != null)
this.stats.close();
} else {
if (shutdownInProgress()) {
return;
}
// Hasn't able to send close request to any server.
if (exception != null) {
throw new CqException(
String.format("Failed to close the cq. CqName: %s. Error from last endpoint: %s",
this.cqName, exception.getLocalizedMessage()),
exception.getCause());
} else {
throw new CqException(
String.format(
"Failed to close the cq. CqName: %s. The server endpoints on which this cq was registered were not found.",
this.cqName));
}
}
}
// Invoke close on Listeners if any.
if (this.cqAttributes != null) {
CqListener[] cqListeners = this.getCqAttributes().getCqListeners();
if (cqListeners != null) {
if (isDebugEnabled) {
logger.debug(
"Invoking CqListeners close() api for the CQ, CqName: {} Number of CqListeners: {}",
cqName, cqListeners.length);
}
for (int lCnt = 0; lCnt < cqListeners.length; lCnt++) {
try {
cqListeners[lCnt].close();
// Handle client side exceptions.
} catch (Exception ex) {
logger.warn("Exception occurred in the CqListener of the CQ, CqName : {} Error : {}",
new Object[] {cqName, ex.getLocalizedMessage()});
if (isDebugEnabled) {
logger.debug(ex.getMessage(), ex);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn(
"RuntimeException occurred in the CqListener of the CQ, CqName : {} Error : {}",
new Object[] {cqName, t.getLocalizedMessage()});
if (isDebugEnabled) {
logger.debug(t.getMessage(), t);
}
}
}
}
}
if (isDebugEnabled) {
logger.debug("Successfully closed the CQ. {}", cqName);
}
}
/**
* Clears the resource used by CQ.
*/
@Override
protected void cleanup() throws CqException {
this.cqService.removeFromBaseRegionToCqNameMap(this.regionName, this.getServerCqName());
}
@Override
public CqAttributes getCqAttributes() {
return cqAttributes;
}
/**
* @return Returns the cqListeners.
*/
public CqListener[] getCqListeners() {
return cqAttributes.getCqListeners();
}
/**
* Start or resume executing the query.
*/
@Override
public void execute() throws CqClosedException, RegionNotFoundException, CqException {
executeCqOnRedundantsAndPrimary(false);
}
/**
* Start or resume executing the query. Gets or updates the CQ results and returns them.
*/
@Override
public <E> CqResults<E> executeWithInitialResults()
throws CqClosedException, RegionNotFoundException, CqException {
synchronized (queuedEventsSynchObject) {
// Prevents multiple calls to executeWithInitialResults from squishing
// each others queuedEvents. Forces the second call to wait
// until first call is completed.
while (queuedEvents != null) {
try {
queuedEventsSynchObject.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// At this point we know queuedEvents is null and no one is adding to queuedEvents yet.
this.queuedEvents = new ConcurrentLinkedQueue<CqEventImpl>();
}
if (CqQueryImpl.testHook != null) {
testHook.pauseUntilReady();
}
// Send CQ request to servers.
// If an exception is thrown, we need to clean up the queuedEvents
// or else client will hang on next executeWithInitialResults
CqResults initialResults;
try {
initialResults = (CqResults) executeCqOnRedundantsAndPrimary(true);
} catch (RegionNotFoundException | CqException | RuntimeException e) {
queuedEvents = null;
throw e;
}
// lock was released earlier so that events that are received while executing
// initial results can be added to the queue.
synchronized (queuedEventsSynchObject) {
// Check if any CQ Events are received during query execution.
// Invoke the CQ Listeners with the received CQ Events.
try {
if (!this.queuedEvents.isEmpty()) {
try {
Thread thread = new LoggingThread("CQEventHandler For " + cqName, () -> {
Object[] eventArray = null;
if (CqQueryImpl.testHook != null) {
testHook.setEventCount(queuedEvents.size());
}
// Synchronization for the executer thread.
synchronized (queuedEventsSynchObject) {
try {
eventArray = queuedEvents.toArray();
// Process through the events
for (Object cqEvent : eventArray) {
cqService.invokeListeners(cqName, ClientCQImpl.this, (CqEventImpl) cqEvent);
stats.decQueuedCqListenerEvents();
}
} finally {
// Make sure that we notify waiting threads or else possible dead lock
queuedEvents.clear();
queuedEvents = null;
queuedEventsSynchObject.notify();
}
}
});
thread.start();
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Exception while invoking the CQ Listener with queued events.", ex);
}
}
} else {
queuedEvents = null;
}
} finally {
queuedEventsSynchObject.notify();
}
return initialResults;
}
}
/**
* This executes the CQ first on the redundant server and then on the primary server. This is
* required to keep the redundancy behavior in accordance with the HAQueue expectation (wherein
* the events are delivered only from the primary).
*
* @param executeWithInitialResults boolean
* @return Object SelectResults in case of executeWithInitialResults
*/
private Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
throws CqClosedException, RegionNotFoundException, CqException {
Object initialResults = null;
synchronized (this.cqState) {
if (this.isClosed()) {
throw new CqClosedException(
String.format("CQ is closed, CqName : %s", this.cqName));
}
if (this.isRunning()) {
throw new IllegalStateException(String.format("CQ is in running state, CqName : %s",
this.cqName));
}
if (logger.isDebugEnabled()) {
logger.debug("Performing Execute {} request for CQ. CqName: {}",
((executeWithInitialResults) ? "WithInitialResult" : ""), this.cqName);
}
this.cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(this.regionName);
// If not server send the request to server.
if (!cqService.isServer()) {
// Send execute request to the server.
// If CqService is initialized using the pool, the connection proxy is set using the
// pool that initializes the CQ. Else its set using the Region proxy.
if (this.cqProxy == null) {
initConnectionProxy();
}
boolean success = false;
try {
if (this.proxyCache != null) {
if (this.proxyCache.isClosed()) {
throw proxyCache.getCacheClosedException("Cache is closed for this user.");
}
UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
}
if (executeWithInitialResults) {
initialResults = cqProxy.createWithIR(this);
if (initialResults == null) {
String errMsg = "Failed to execute the CQ. CqName: " + this.cqName
+ ", Query String is: " + this.queryString;
throw new CqException(errMsg);
}
} else {
cqProxy.create(this);
}
success = true;
} catch (Exception ex) {
// Check for system shutdown.
if (this.shutdownInProgress()) {
throw new CqException("System shutdown in progress.");
}
if (ex.getCause() instanceof GemFireSecurityException) {
if (securityLogWriter.warningEnabled()) {
securityLogWriter.warning(
String.format("Exception while executing cq Exception: %s", ex));
}
throw new CqException(ex.getCause().getMessage(), ex.getCause());
} else if (ex instanceof CqException) {
throw (CqException) ex;
} else {
String errMsg =
String.format(
"Failed to execute the CQ. CqName: %s, Query String is: %s, Error from last server: %s",
this.cqName, this.queryString, ex.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug(errMsg, ex);
}
throw new CqException(errMsg, ex);
}
} finally {
if (!success && !this.shutdownInProgress()) {
try {
cqProxy.close(this);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Exception cleaning up failed cq", e);
}
UserAttributes.userAttributes.set(null);
}
}
UserAttributes.userAttributes.set(null);
}
}
this.cqState.setState(CqStateImpl.RUNNING);
}
// If client side, alert listeners that a cqs have been connected
if (!cqService.isServer()) {
connected = true;
CqListener[] cqListeners = getCqAttributes().getCqListeners();
for (int lCnt = 0; lCnt < cqListeners.length; lCnt++) {
if (cqListeners[lCnt] != null) {
if (cqListeners[lCnt] instanceof CqStatusListener) {
CqStatusListener listener = (CqStatusListener) cqListeners[lCnt];
listener.onCqConnected();
}
}
}
}
// Update CQ-base region for book keeping.
this.cqService.stats().incCqsActive();
this.cqService.stats().decCqsStopped();
return initialResults;
}
/**
* Check to see if shutdown in progress.
*
* @return true if shutdown in progress else false.
*/
private boolean shutdownInProgress() {
InternalCache cache = cqService.getInternalCache();
if (cache == null || cache.isClosed()) {
return true; // bail, things are shutting down
}
String reason = cqProxy.getPool().getCancelCriterion().cancelInProgress();
if (reason != null) {
return true;
}
return false;
}
/**
* Stop or pause executing the query.
*/
@Override
public void stop() throws CqClosedException, CqException {
boolean isStopped = false;
synchronized (this.cqState) {
if (this.isClosed()) {
throw new CqClosedException(
String.format("CQ is closed, CqName : %s", this.cqName));
}
if (!(this.isRunning())) {
throw new IllegalStateException(
String.format("CQ is not in running state, stop CQ does not apply, CqName : %s",
this.cqName));
}
Exception exception = null;
try {
if (this.proxyCache != null) {
if (this.proxyCache.isClosed()) {
throw proxyCache.getCacheClosedException("Cache is closed for this user.");
}
UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes());
}
cqProxy.stop(this);
isStopped = true;
} catch (Exception e) {
exception = e;
} finally {
UserAttributes.userAttributes.set(null);
}
if (cqProxy == null || isStopped) {
// Change state and stats on the client side
this.cqState.setState(CqStateImpl.STOPPED);
this.cqService.stats().incCqsStopped();
this.cqService.stats().decCqsActive();
if (logger.isDebugEnabled()) {
logger.debug("Successfully stopped the CQ. {}", cqName);
}
} else {
// Hasn't able to send stop request to any server.
if (exception != null) {
throw new CqException(
String.format("Failed to stop the cq. CqName :%s Error from last server: %s",
this.cqName, exception.getLocalizedMessage()),
exception.getCause());
} else {
throw new CqException(
String.format(
"Failed to stop the cq. CqName: %s. The server endpoints on which this cq was registered were not found.",
this.cqName));
}
}
}
}
@Override
public CqAttributesMutator getCqAttributesMutator() {
return (CqAttributesMutator) this.cqAttributes;
}
ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
return this.queuedEvents;
}
@Override
public void setProxyCache(ProxyCache proxyCache) {
this.proxyCache = proxyCache;
}
boolean isConnected() {
return connected;
}
void setConnected(boolean connected) {
this.connected = connected;
}
@Override
public void createOn(Connection conn, boolean isDurable) {
byte regionDataPolicyOrdinal = getCqBaseRegion() == null ? (byte) 0
: getCqBaseRegion().getAttributes().getDataPolicy().ordinal;
int state = this.cqState.getState();
this.cqProxy.createOn(getName(), conn, getQueryString(), state, isDurable,
regionDataPolicyOrdinal);
}
}