blob: ee849168ec978ae9b2dbbb39c4d0301796f45e0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheLocalConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Common logic for near caches (smaller local cache that stores most recently or most frequently accessed data).
*/
public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final CachePeekMode[] NEAR_PEEK_MODE = {CachePeekMode.NEAR};
/**
* Empty constructor required for {@link Externalizable}.
*/
protected GridNearCacheAdapter() {
// No-op.
}
/**
* @param ctx Context.
*/
protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) {
super(ctx);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (map == null) {
map = new GridCacheLocalConcurrentMap(
ctx,
entryFactory(),
ctx.config().getNearConfiguration().getNearStartSize());
}
}
/**
* @return Entry factory.
*/
private GridCacheMapEntryFactory entryFactory() {
return new GridCacheMapEntryFactory() {
@Override public GridCacheMapEntry create(
GridCacheContext ctx,
AffinityTopologyVersion topVer,
KeyCacheObject key
) {
return new GridNearCacheEntry(ctx, key);
}
};
}
/**
* @return DHT cache.
*/
public abstract GridDhtCacheAdapter<K, V> dht();
/** {@inheritDoc} */
@Override public void onReconnected() {
map = new GridCacheLocalConcurrentMap(
ctx,
entryFactory(),
ctx.config().getNearConfiguration().getNearStartSize());
}
/** {@inheritDoc} */
@Override public boolean isNear() {
return true;
}
/** {@inheritDoc} */
@Override public GridCachePreloader preloader() {
return dht().preloader();
}
/** {@inheritDoc} */
@Override public GridCacheMapEntry entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) {
GridNearCacheEntry entry = null;
while (true) {
try {
entry = (GridNearCacheEntry)super.entryEx(key, topVer);
entry.initializeFromDht(topVer);
return entry;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed near entry while initializing from DHT entry (will retry): " + entry);
}
}
}
/**
* @param key Key.
* @param topVer Topology version.
* @return Entry.
*/
public GridNearCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) {
return (GridNearCacheEntry)entryEx(key, topVer);
}
/**
* @param key Key.
* @return Entry.
*/
@Nullable public GridNearCacheEntry peekExx(KeyCacheObject key) {
return (GridNearCacheEntry)peekEx(key);
}
/** {@inheritDoc} */
@Override public boolean isLocked(K key) {
return super.isLocked(key) || dht().isLocked(key);
}
/**
* @param key Key.
* @return If near entry is locked.
*/
public boolean isLockedNearOnly(K key) {
return super.isLocked(key);
}
/**
* @param keys Keys.
* @return If near entries for given keys are locked.
*/
public boolean isAllLockedNearOnly(Iterable<? extends K> keys) {
A.notNull(keys, "keys");
for (K key : keys)
if (!isLockedNearOnly(key))
return false;
return true;
}
/**
* @param tx Transaction.
* @param keys Keys to load.
* @param forcePrimary Force primary flag.
* @param taskName Task name.
* @param deserializeBinary Deserialize binary flag.
* @param expiryPlc Expiry policy.
* @param skipVal Skip value flag.
* @param skipStore Skip store flag.
* @param needVer Need version.
* @return Loaded values.
*/
public IgniteInternalFuture<Map<K, V>> loadAsync(
@Nullable IgniteInternalTx tx,
@Nullable Collection<KeyCacheObject> keys,
boolean forcePrimary,
String taskName,
boolean deserializeBinary,
boolean recovery,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
boolean skipStore,
boolean needVer
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
IgniteTxLocalEx txx = (tx != null && tx.local()) ? (IgniteTxLocalEx)tx : null;
final IgniteCacheExpiryPolicy expiry = expiryPolicy(expiryPlc);
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
!skipStore,
forcePrimary,
txx,
taskName,
deserializeBinary,
expiry,
skipVal,
needVer,
false,
recovery);
// init() will register future for responses if future has remote mappings.
fut.init(null);
return fut;
}
/** {@inheritDoc} */
@Override public void localLoadCache(IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException {
dht().localLoadCache(p, args);
}
/** {@inheritDoc} */
@Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc, boolean keepBinary) throws IgniteCheckedException {
dht().localLoad(keys, plc, keepBinary);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> localLoadCacheAsync(IgniteBiPredicate<K, V> p, Object[] args) {
return dht().localLoadCacheAsync(p, args);
}
/**
* @param nodeId Sender ID.
* @param res Response.
*/
protected void processGetResponse(UUID nodeId, GridNearGetResponse res) {
CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
return;
}
fut.onResult(nodeId, res);
}
/** {@inheritDoc} */
@Override public int size() {
return dht().size();
}
/** {@inheritDoc} */
@Override public long sizeLong() {
return nearEntries().size() + dht().size();
}
/** {@inheritDoc} */
@Override public int primarySize() {
return dht().primarySize();
}
/** {@inheritDoc} */
@Override public long primarySizeLong() {
return dht().primarySizeLong();
}
/** {@inheritDoc} */
@Override public int nearSize() {
return nearEntries().size();
}
/**
* @return Near entries.
*/
public Set<Cache.Entry<K, V>> nearEntries() {
final AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
return super.entrySet(new CacheEntryPredicateAdapter() {
@Override public boolean apply(GridCacheEntryEx entry) {
GridNearCacheEntry nearEntry = (GridNearCacheEntry)entry;
return !nearEntry.deleted() && nearEntry.visitable(CU.empty0()) && nearEntry.valid(topVer);
}
});
}
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet(@Nullable final CacheEntryPredicate... filter) {
CacheEntryPredicate p = new CacheEntryPredicateAdapter() {
@Override public boolean apply(GridCacheEntryEx ex) {
if (ex instanceof GridCacheMapEntry)
return ((GridCacheMapEntry)ex).visitable(filter);
else
return !ex.deleted() && F.isAll(ex, filter);
}
};
return new EntrySet(super.entrySet(p), dht().entrySet(p));
}
/** {@inheritDoc} */
@Override public boolean evict(K key) {
// Use unary 'and' to make sure that both sides execute.
return super.evict(key) & dht().evict(key);
}
/** {@inheritDoc} */
@Override public void evictAll(Collection<? extends K> keys) {
super.evictAll(keys);
dht().evictAll(keys);
}
/** {@inheritDoc} */
@Override public boolean clearLocally(K key) {
return super.clearLocally(key) | dht().clearLocally(key);
}
/** {@inheritDoc} */
@Override public void clearLocallyAll(Set<? extends K> keys, boolean srv, boolean near, boolean readers) {
super.clearLocallyAll(keys, srv, near, readers);
dht().clearLocallyAll(keys, srv, near, readers);
}
/** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
return dht().offHeapEntriesCount();
}
/** {@inheritDoc} */
@Override public long offHeapAllocatedSize() {
return dht().offHeapAllocatedSize();
}
/** {@inheritDoc} */
@Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near,
boolean readers) {
assert configuration().getNearConfiguration() != null;
if (ctx.affinityNode()) {
GridCacheVersion obsoleteVer = nextVersion();
List<GridCacheClearAllRunnable<K, V>> dhtJobs = dht().splitClearLocally(srv, near, readers);
List<GridCacheClearAllRunnable<K, V>> res = new ArrayList<>(dhtJobs.size());
for (GridCacheClearAllRunnable<K, V> dhtJob : dhtJobs)
res.add(new GridNearCacheClearAllRunnable<>(this, obsoleteVer, dhtJob));
return res;
}
else
return super.splitClearLocally(srv, near, readers);
}
/**
* Wrapper for entry set.
*/
private class EntrySet extends AbstractSet<Cache.Entry<K, V>> {
/** Near entry set. */
private Set<Cache.Entry<K, V>> nearSet;
/** Dht entry set. */
private Set<Cache.Entry<K, V>> dhtSet;
/**
* @param nearSet Near entry set.
* @param dhtSet Dht entry set.
*/
private EntrySet(Set<Cache.Entry<K, V>> nearSet, Set<Cache.Entry<K, V>> dhtSet) {
assert nearSet != null;
assert dhtSet != null;
this.nearSet = nearSet;
this.dhtSet = dhtSet;
}
/** {@inheritDoc} */
@NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() {
return new EntryIterator(nearSet.iterator(),
F.iterator0(dhtSet, false, new P1<Cache.Entry<K, V>>() {
@Override public boolean apply(Cache.Entry<K, V> e) {
try {
return GridNearCacheAdapter.super.localPeek(e.getKey(), NEAR_PEEK_MODE) == null;
}
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
}
}
}));
}
/** {@inheritDoc} */
@Override public int size() {
return F.size(iterator());
}
}
/**
* Entry set iterator.
*/
private class EntryIterator implements Iterator<Cache.Entry<K, V>> {
/** */
private Iterator<Cache.Entry<K, V>> dhtIter;
/** */
private Iterator<Cache.Entry<K, V>> nearIter;
/** */
private Iterator<Cache.Entry<K, V>> currIter;
/** */
private Cache.Entry<K, V> currEntry;
/**
* @param nearIter Near set iterator.
* @param dhtIter Dht set iterator.
*/
private EntryIterator(Iterator<Cache.Entry<K, V>> nearIter, Iterator<Cache.Entry<K, V>> dhtIter) {
assert nearIter != null;
assert dhtIter != null;
this.nearIter = nearIter;
this.dhtIter = dhtIter;
currIter = nearIter;
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
return nearIter.hasNext() || dhtIter.hasNext();
}
/** {@inheritDoc} */
@Override public Cache.Entry<K, V> next() {
if (!hasNext())
throw new NoSuchElementException();
if (!currIter.hasNext())
currIter = dhtIter;
return currEntry = currIter.next();
}
/** {@inheritDoc} */
@Override public void remove() {
if (currEntry == null)
throw new IllegalStateException();
assert currIter != null;
currIter.remove();
try {
GridNearCacheAdapter.this.getAndRemove(currEntry.getKey());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearCacheAdapter.class, this);
}
}