blob: dd66d03608f5208bfbf1ed6dddfdbad308ddc8ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.internal;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Internal CQ cache implementation for CQs operating on replicate regions.
*/
class ServerCQResultsCacheReplicateRegionImpl implements ServerCQResultsCache {
private static final Logger logger = LogService.getLogger();
/**
* To indicate if the CQ results key cache is initialized.
*/
public volatile boolean cqResultKeysInitialized = false;
/**
* This holds the keys that are part of the CQ query results. Using this CQ engine can determine
* whether to execute query on old value from EntryEvent, which is an expensive operation.
*
* NOTE: In case of RR this map is populated and used as intended. In case of PR this map will not
* be populated. If executeCQ happens after update operations this map will remain empty.
*/
private final Map<Object, Object> cqResultKeys;
/**
* This maintains the keys that are destroyed while the Results Cache is getting constructed. This
* avoids any keys that are destroyed (after query execution) but is still part of the CQs result.
*/
private final Set<Object> destroysWhileCqResultsInProgress;
// Synchronize operations on cqResultKeys & destroysWhileCqResultsInProgress
private final Object LOCK = new Object();
public ServerCQResultsCacheReplicateRegionImpl() {
cqResultKeys = new HashMap<>();
destroysWhileCqResultsInProgress = new HashSet<>();
}
@Override
public void setInitialized() {
cqResultKeysInitialized = true;
}
@Override
public boolean isInitialized() {
return cqResultKeysInitialized;
}
@Override
public void add(Object key) {
synchronized (LOCK) {
cqResultKeys.put(key, TOKEN);
if (!isInitialized()) {
// This key could be coming after add, destroy.
// Remove this from destroy queue.
destroysWhileCqResultsInProgress.remove(key);
}
}
}
@Override
public void remove(Object key, boolean isTokenMode) {
synchronized (LOCK) {
if (isTokenMode && cqResultKeys.get(key) != Token.DESTROYED) {
return;
}
cqResultKeys.remove(key);
if (!isInitialized()) {
destroysWhileCqResultsInProgress.add(key);
}
}
}
@Override
public void invalidate() {
synchronized (LOCK) {
cqResultKeys.clear();
cqResultKeysInitialized = false;
}
}
/**
* Returns if the passed key is part of the CQs result set. This method needs to be called once
* the CQ result key caching is completed (cqResultsCacheInitialized is true).
*
* @return true if key is in the Results Cache.
*/
@Override
public boolean contains(Object key) {
// Handle events that may have been deleted,
// but added by result caching.
if (!isInitialized()) {
logger.warn(
"The CQ Result key cache is not initialized. This should not happen as the call to isPartOfCqResult() is based on the condition cqResultsCacheInitialized.");
return false;
}
synchronized (LOCK) {
destroysWhileCqResultsInProgress.forEach(cqResultKeys::remove);
destroysWhileCqResultsInProgress.clear();
}
return cqResultKeys.containsKey(key);
}
/**
* Marks the key as destroyed in the CQ Results key cache.
*/
@Override
public void markAsDestroyed(Object key) {
synchronized (LOCK) {
cqResultKeys.put(key, Token.DESTROYED);
if (!isInitialized()) {
destroysWhileCqResultsInProgress.add(key);
}
}
}
@Override
public int size() {
synchronized (LOCK) {
return cqResultKeys.size();
}
}
/**
* For Test use only.
*
* @return CQ Results Cache.
*/
@Override
public Set<Object> getKeys() {
synchronized (LOCK) {
return Collections.synchronizedSet(new HashSet<>(cqResultKeys.keySet()));
}
}
@Override
public boolean isOldValueRequiredForQueryProcessing(Object key) {
return !isInitialized() || !contains(key);
}
@Override
public boolean isKeyDestroyed(Object key) {
return false;
}
@Override
public void clear() {
// Clean-up the CQ Results Cache.
synchronized (LOCK) {
cqResultKeys.clear();
}
}
}