blob: 05f8932fec61b4d524bd7ea2872730c53b2080b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal.cq;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheCallback;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesMutator;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class CqAttributesImpl implements CqAttributes, CqAttributesMutator, Cloneable {
private static final Logger logger = LogService.getLogger();
private volatile ArrayList<CqListener> cqListeners = null;
@Immutable
private static final CqListener[] EMPTY_LISTENERS = new CqListener[0];
/**
* Used to synchronize access to cqListeners
*/
private final Object clSync = new Object();
/**
* Returns the CqListeners set with the CQ
*
* @return CqListener[]
*/
@Override
public CqListener[] getCqListeners() {
final ArrayList listeners = this.cqListeners;
if (listeners == null) {
return CqAttributesImpl.EMPTY_LISTENERS;
}
CqListener[] result = null;
synchronized (this.clSync) {
result = new CqListener[listeners.size()];
listeners.toArray(result);
}
return result;
}
/**
* Returns the CqListener set with the CQ
*
*/
@Override
public CqListener getCqListener() {
ArrayList<CqListener> listeners = this.cqListeners;
if (listeners == null) {
return null;
}
synchronized (this.clSync) {
if (listeners.size() == 0) {
return null;
}
if (listeners.size() == 1) {
return listeners.get(0);
}
}
throw new IllegalStateException(
"More than one Cqlistener exists.");
}
@Override
public Object clone() {
try {
return super.clone();
} catch (CloneNotSupportedException e) {
throw new InternalError(
"CloneNotSupportedException thrown in class that implements cloneable");
}
}
/**
* Adds a Cqlistener to the end of the list of Cqlisteners on this CqQuery.
*
* @param cql the user defined cq listener to add to the CqQuery.
* @throws IllegalArgumentException if <code>aListener</code> is null
*/
@Override
public void addCqListener(CqListener cql) {
if (cql == null) {
throw new IllegalArgumentException(
"addCqListener parameter was null");
}
synchronized (this.clSync) {
ArrayList<CqListener> oldListeners = this.cqListeners;
if (oldListeners == null || oldListeners.size() == 0) {
ArrayList<CqListener> al = new ArrayList<CqListener>(1);
al.add(cql);
this.setCqListeners(al);
} else {
if (!oldListeners.contains(cql)) {
oldListeners.add(cql);
}
}
}
}
/**
* Removes all Cqlisteners, calling on each of them, and then adds each listener in the specified
* array.
*
* @param addedListeners a possibly null or empty array of listeners to add to this CqQuery.
* @throws IllegalArgumentException if the <code>newListeners</code> array has a null element
*/
@Override
public void initCqListeners(CqListener[] addedListeners) {
ArrayList<CqListener> oldListeners;
synchronized (this.clSync) {
oldListeners = this.cqListeners;
if (addedListeners == null || addedListeners.length == 0) {
this.setCqListeners(null);
} else { // we have some listeners to add
List nl = Arrays.asList(addedListeners);
if (nl.contains(null)) {
throw new IllegalArgumentException(
"initCqListeners parameter had a null element");
}
this.setCqListeners(new ArrayList(nl));
}
}
if (oldListeners != null) {
CqListener cql = null;
for (Iterator<CqListener> iter = oldListeners.iterator(); iter.hasNext();) {
try {
cql = iter.next();
cql.close();
// Handle client side exceptions.
} catch (Exception ex) {
logger.warn("Exception occurred while closing CQ Listener Error: {}",
ex.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug(ex.getMessage(), ex);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn("Runtime Exception occurred while closing CQ Listener Error: {}",
t.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug(t.getMessage(), t);
}
}
}
}
}
/**
* Removes a Cqlistener from the list of Cqlisteners on this CqQuery. Does nothing if the
* specified listener has not been added. If the specified listener has been added then
* {@link CacheCallback#close()} will be called on it; otherwise does nothing.
*
* @param cql the Cqlistener to remove from the CqQuery.
* @throws IllegalArgumentException if <code>cl</code> is null
*/
@Override
public void removeCqListener(CqListener cql) {
if (cql == null) {
throw new IllegalArgumentException(
"removeCqListener parameter was null");
}
synchronized (this.clSync) {
ArrayList<CqListener> oldListeners = this.cqListeners;
if (oldListeners != null) {
if (oldListeners.remove(cql)) {
if (oldListeners.isEmpty()) {
this.setCqListeners(null);
}
try {
cql.close();
// Handle client side exceptions.
} catch (Exception ex) {
logger.warn("Exception closing CQ Listener Error: {}",
ex.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug(ex.getMessage(), ex);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn("Runtime Exception occurred closing CQ Listener Error: {}",
t.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug(t.getMessage(), t);
}
}
}
}
}
}
public void setCqListeners(ArrayList<CqListener> cqListeners) {
synchronized (this.clSync) {
this.cqListeners = cqListeners;
}
}
}