blob: 39047813e85e579f12458a19690d6761712da78b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheLocalConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
* Common logic for near caches.
public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final CachePeekMode[] NEAR_PEEK_MODE = {CachePeekMode.NEAR};
* Empty constructor required for {@link Externalizable}.
protected GridNearCacheAdapter() {
// No-op.
* @param ctx Context.
protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (map == null) {
map = new GridCacheLocalConcurrentMap(
* @return Entry factory.
private GridCacheMapEntryFactory entryFactory() {
return new GridCacheMapEntryFactory() {
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
KeyCacheObject key
) {
return new GridNearCacheEntry(ctx, key);
* @return DHT cache.
public abstract GridDhtCacheAdapter<K, V> dht();
/** {@inheritDoc} */
@Override public void forceKeyCheck() {
/** {@inheritDoc} */
@Override public void onReconnected() {
map = new GridCacheLocalConcurrentMap(
/** {@inheritDoc} */
@Override public boolean isNear() {
return true;
/** {@inheritDoc} */
@Override public GridCachePreloader preloader() {
return dht().preloader();
/** {@inheritDoc} */
@Override public GridCacheMapEntry entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) {
GridNearCacheEntry entry = null;
while (true) {
try {
entry = (GridNearCacheEntry)super.entryEx(key, topVer);
return entry;
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed near entry while initializing from DHT entry (will retry): " + entry);
* @param key Key.
* @param topVer Topology version.
* @return Entry.
public GridNearCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) {
return (GridNearCacheEntry)entryEx(key, topVer);
* @param key Key.
* @return Entry.
@Nullable public GridNearCacheEntry peekExx(KeyCacheObject key) {
return (GridNearCacheEntry)peekEx(key);
/** {@inheritDoc} */
@Override public boolean isLocked(K key) {
return super.isLocked(key) || dht().isLocked(key);
* @param key Key.
* @return If near entry is locked.
public boolean isLockedNearOnly(K key) {
return super.isLocked(key);
* @param keys Keys.
* @return If near entries for given keys are locked.
public boolean isAllLockedNearOnly(Iterable<? extends K> keys) {
A.notNull(keys, "keys");
for (K key : keys)
if (!isLockedNearOnly(key))
return false;
return true;
* @param tx Transaction.
* @param keys Keys to load.
* @param forcePrimary Force primary flag.
* @param subjId Subject ID.
* @param taskName Task name.
* @param deserializeBinary Deserialize binary flag.
* @param expiryPlc Expiry policy.
* @param skipVal Skip value flag.
* @param skipStore Skip store flag.
* @param needVer Need version.
* @return Loaded values.
public IgniteInternalFuture<Map<K, V>> loadAsync(
@Nullable IgniteInternalTx tx,
@Nullable Collection<KeyCacheObject> keys,
boolean forcePrimary,
@Nullable UUID subjId,
String taskName,
boolean deserializeBinary,
boolean recovery,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
boolean skipStore,
boolean needVer
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
IgniteTxLocalEx txx = (tx != null && tx.local()) ? (IgniteTxLocalEx)tx : null;
final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc);
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
// init() will register future for responses if future has remote mappings.
return fut;
/** {@inheritDoc} */
@Override public void localLoadCache(IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException {
dht().localLoadCache(p, args);
/** {@inheritDoc} */
@Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc, boolean keepBinary) throws IgniteCheckedException {
dht().localLoad(keys, plc, keepBinary);
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> localLoadCacheAsync(IgniteBiPredicate<K, V> p, Object[] args) {
return dht().localLoadCacheAsync(p, args);
* @param nodeId Sender ID.
* @param res Response.
protected void processGetResponse(UUID nodeId, GridNearGetResponse res) {
CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
fut.onResult(nodeId, res);
/** {@inheritDoc} */
@Override public int size() {
return dht().size();
/** {@inheritDoc} */
@Override public long sizeLong() {
return nearEntries().size() + dht().size();
/** {@inheritDoc} */
@Override public int primarySize() {
return dht().primarySize();
/** {@inheritDoc} */
@Override public long primarySizeLong() {
return dht().primarySizeLong();
/** {@inheritDoc} */
@Override public int nearSize() {
return nearEntries().size();
* @return Near entries.
public Set<Cache.Entry<K, V>> nearEntries() {
final AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
return super.entrySet(new CacheEntryPredicateAdapter() {
@Override public boolean apply(GridCacheEntryEx entry) {
GridNearCacheEntry nearEntry = (GridNearCacheEntry)entry;
return !nearEntry.deleted() && nearEntry.visitable(CU.empty0()) && nearEntry.valid(topVer);
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet(@Nullable final CacheEntryPredicate... filter) {
CacheEntryPredicate p = new CacheEntryPredicateAdapter() {
@Override public boolean apply(GridCacheEntryEx ex) {
if (ex instanceof GridCacheMapEntry)
return ((GridCacheMapEntry)ex).visitable(filter);
return !ex.deleted() && F.isAll(ex, filter);
return new EntrySet(super.entrySet(p), dht().entrySet(p));
/** {@inheritDoc} */
@Override public boolean evict(K key) {
// Use unary 'and' to make sure that both sides execute.
return super.evict(key) & dht().evict(key);
/** {@inheritDoc} */
@Override public void evictAll(Collection<? extends K> keys) {
/** {@inheritDoc} */
@Override public boolean clearLocally(K key) {
return super.clearLocally(key) | dht().clearLocally(key);
/** {@inheritDoc} */
@Override public void clearLocallyAll(Set<? extends K> keys, boolean srv, boolean near, boolean readers) {
super.clearLocallyAll(keys, srv, near, readers);
dht().clearLocallyAll(keys, srv, near, readers);
/** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
return dht().offHeapEntriesCount();
/** {@inheritDoc} */
@Override public long offHeapAllocatedSize() {
return dht().offHeapAllocatedSize();
/** {@inheritDoc} */
@Override public boolean isIgfsDataCache() {
return dht().isIgfsDataCache();
/** {@inheritDoc} */
@Override public long igfsDataSpaceUsed() {
return dht().igfsDataSpaceUsed();
/** {@inheritDoc} */
@Override public void onIgfsDataSizeChanged(long delta) {
/** {@inheritDoc} */
@Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near,
boolean readers) {
assert configuration().getNearConfiguration() != null;
if (ctx.affinityNode()) {
GridCacheVersion obsoleteVer = ctx.versions().next();
List<GridCacheClearAllRunnable<K, V>> dhtJobs = dht().splitClearLocally(srv, near, readers);
List<GridCacheClearAllRunnable<K, V>> res = new ArrayList<>(dhtJobs.size());
for (GridCacheClearAllRunnable<K, V> dhtJob : dhtJobs)
res.add(new GridNearCacheClearAllRunnable<>(this, obsoleteVer, dhtJob));
return res;
return super.splitClearLocally(srv, near, readers);
* Wrapper for entry set.
private class EntrySet extends AbstractSet<Cache.Entry<K, V>> {
/** Near entry set. */
private Set<Cache.Entry<K, V>> nearSet;
/** Dht entry set. */
private Set<Cache.Entry<K, V>> dhtSet;
* @param nearSet Near entry set.
* @param dhtSet Dht entry set.
private EntrySet(Set<Cache.Entry<K, V>> nearSet, Set<Cache.Entry<K, V>> dhtSet) {
assert nearSet != null;
assert dhtSet != null;
this.nearSet = nearSet;
this.dhtSet = dhtSet;
/** {@inheritDoc} */
@NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() {
return new EntryIterator(nearSet.iterator(),
F.iterator0(dhtSet, false, new P1<Cache.Entry<K, V>>() {
@Override public boolean apply(Cache.Entry<K, V> e) {
try {
return GridNearCacheAdapter.super.localPeek(e.getKey(), NEAR_PEEK_MODE, null) == null;
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
/** {@inheritDoc} */
@Override public int size() {
return F.size(iterator());
* Entry set iterator.
private class EntryIterator implements Iterator<Cache.Entry<K, V>> {
/** */
private Iterator<Cache.Entry<K, V>> dhtIter;
/** */
private Iterator<Cache.Entry<K, V>> nearIter;
/** */
private Iterator<Cache.Entry<K, V>> currIter;
/** */
private Cache.Entry<K, V> currEntry;
* @param nearIter Near set iterator.
* @param dhtIter Dht set iterator.
private EntryIterator(Iterator<Cache.Entry<K, V>> nearIter, Iterator<Cache.Entry<K, V>> dhtIter) {
assert nearIter != null;
assert dhtIter != null;
this.nearIter = nearIter;
this.dhtIter = dhtIter;
currIter = nearIter;
/** {@inheritDoc} */
@Override public boolean hasNext() {
return nearIter.hasNext() || dhtIter.hasNext();
/** {@inheritDoc} */
@Override public Cache.Entry<K, V> next() {
if (!hasNext())
throw new NoSuchElementException();
if (!currIter.hasNext())
currIter = dhtIter;
return currEntry =;
/** {@inheritDoc} */
@Override public void remove() {
if (currEntry == null)
throw new IllegalStateException();
assert currIter != null;
try {
catch (IgniteCheckedException e) {
throw new IgniteException(e);
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearCacheAdapter.class, this);