blob: a948dab2e053582c616aff9da8cade94f7ccf494 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvcc;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
/**
* Near cache entry.
*/
@SuppressWarnings({"TooBroadScope"})
public class GridNearCacheEntry extends GridDistributedCacheEntry {
/** */
private static final int NEAR_SIZE_OVERHEAD = 36 + 16;
/** Topology version at the moment when value was initialized from primary node. */
private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** DHT version which caused the last update. */
private GridCacheVersion dhtVer;
/** Partition. */
private int part;
/** */
private short evictReservations;
/**
* @param ctx Cache context.
* @param key Cache key.
*/
public GridNearCacheEntry(
GridCacheContext ctx,
KeyCacheObject key
) {
super(ctx, key);
part = ctx.affinity().partition(key);
}
/** {@inheritDoc} */
@Override public int memorySize() throws IgniteCheckedException {
return super.memorySize() + NEAR_SIZE_OVERHEAD;
}
/** {@inheritDoc} */
@Override public int partition() {
return part;
}
/** {@inheritDoc} */
@Override public boolean isNear() {
return true;
}
/** {@inheritDoc} */
@Override public boolean valid(AffinityTopologyVersion topVer) {
assert topVer.topologyVersion() > 0 : "Topology version is invalid: " + topVer;
AffinityTopologyVersion topVer0 = this.topVer;
if (topVer0.equals(topVer))
return true;
if (topVer0.equals(AffinityTopologyVersion.NONE) || topVer.compareTo(topVer0) < 0)
return false;
try {
if (cctx.affinity().primaryChanged(partition(), topVer0, topVer)) {
this.topVer = AffinityTopologyVersion.NONE;
return false;
}
// As a result of topology change, this node is now a backup for the key - no more need for a Near entry.
if (cctx.affinity().backupByPartition(cctx.localNode(), part, topVer)) {
this.topVer = AffinityTopologyVersion.NONE;
return false;
}
this.topVer = topVer;
return true;
}
catch (IllegalStateException ignore) {
// Do not have affinity history.
this.topVer = AffinityTopologyVersion.NONE;
return false;
}
}
/**
* @param topVer Topology version.
* @throws GridCacheEntryRemovedException If this entry is obsolete.
*/
public void initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
GridDhtCacheEntry entry = cctx.near().dht().peekExx(key);
if (entry != null) {
GridCacheEntryInfo e = entry.info();
if (e != null) {
GridCacheVersion enqueueVer = null;
try {
ClusterNode primaryNode = cctx.affinity().primaryByKey(key, topVer);
lockEntry();
try {
checkObsolete();
if (isNew() || !valid(topVer)) {
// Version does not change for load ops.
update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
if (cctx.deferredDelete() && !isNew() && !isInternal()) {
boolean deleted = val == null;
if (deleted != deletedUnlocked()) {
deletedUnlocked(deleted);
if (deleted)
enqueueVer = e.version();
}
}
if (primaryNode == null)
this.topVer = AffinityTopologyVersion.NONE;
else
recordNodeId(primaryNode.id(), topVer);
dhtVer = e.isNew() || e.isDeleted() ? null : e.version();
}
}
finally {
unlockEntry();
}
}
finally {
if (enqueueVer != null)
cctx.onDeferredDelete(this, enqueueVer);
}
}
}
}
/**
* This method should be called only when lock is owned on this entry.
*
* @param val Value.
* @param ver Version.
* @param dhtVer DHT version.
* @param primaryNodeId Primary node ID.
* @param topVer Topology version.
* @return {@code True} if reset was done.
* @throws GridCacheEntryRemovedException If obsolete.
*/
public boolean resetFromPrimary(CacheObject val,
GridCacheVersion ver,
GridCacheVersion dhtVer,
UUID primaryNodeId,
AffinityTopologyVersion topVer)
throws GridCacheEntryRemovedException
{
assert dhtVer != null;
cctx.versions().onReceived(primaryNodeId, dhtVer);
lockEntry();
try {
checkObsolete();
primaryNode(primaryNodeId, topVer);
if (!F.eq(this.dhtVer, dhtVer)) {
value(val);
this.ver = ver;
this.dhtVer = dhtVer;
return true;
}
}
finally {
unlockEntry();
}
return false;
}
/**
* This method should be called only when lock is owned on this entry.
*
* @param dhtVer DHT version.
* @param val Value associated with version.
* @param expireTime Expire time.
* @param ttl Time to live.
* @param primaryNodeId Primary node ID.
* @param topVer Topology version.
*/
public void updateOrEvict(GridCacheVersion dhtVer,
@Nullable CacheObject val,
long expireTime,
long ttl,
UUID primaryNodeId,
AffinityTopologyVersion topVer)
{
assert dhtVer != null;
cctx.versions().onReceived(primaryNodeId, dhtVer);
lockEntry();
try {
if (!obsolete()) {
// Don't set DHT version to null until we get a match from DHT remote transaction.
if (F.eq(this.dhtVer, dhtVer))
this.dhtVer = null;
// If we are here, then we already tried to evict this entry.
// If cannot evict, then update.
if (this.dhtVer == null) {
if (!markObsolete(cctx.cache().nextVersion())) {
value(val);
ttlAndExpireTimeExtras((int) ttl, expireTime);
primaryNode(primaryNodeId, topVer);
}
}
}
}
finally {
unlockEntry();
}
}
/**
* @return DHT version for this entry.
* @throws GridCacheEntryRemovedException If obsolete.
*/
@Nullable public GridCacheVersion dhtVersion() throws GridCacheEntryRemovedException {
lockEntry();
try {
checkObsolete();
return dhtVer;
}
finally {
unlockEntry();
}
}
/**
* @return Tuple with version and value of this entry.
* @throws GridCacheEntryRemovedException If entry has been removed.
*/
@Nullable public IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue()
throws GridCacheEntryRemovedException {
lockEntry();
try {
checkObsolete();
if (dhtVer == null)
return null;
else {
CacheObject val0 = val;
return F.t(dhtVer, val0);
}
}
finally {
unlockEntry();
}
}
/** {@inheritDoc} */
@Override protected void recordNodeId(UUID primaryNodeId, AffinityTopologyVersion topVer) {
assert lockedByCurrentThread();
assert topVer.compareTo(cctx.affinity().affinityTopologyVersion()) <= 0 : "Affinity not ready [" +
"topVer=" + topVer +
", readyVer=" + cctx.affinity().affinityTopologyVersion() +
", cache=" + cctx.name() + ']';
primaryNode(primaryNodeId, topVer);
}
/**
* @param dhtVer DHT version to record.
* @return {@code False} if given version is lower then existing version.
*/
public final boolean recordDhtVersion(GridCacheVersion dhtVer) {
assert dhtVer != null;
assert lockedByCurrentThread();
if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) <= 0) {
this.dhtVer = dhtVer;
return true;
}
return false;
}
/** {@inheritDoc} */
@Override protected Object readThrough(IgniteInternalTx tx, KeyCacheObject key, boolean reload,
UUID subjId, String taskName) throws IgniteCheckedException {
return cctx.near().loadAsync(tx,
F.asList(key),
/*force primary*/false,
subjId,
taskName,
true,
/*recovery should have already been checked*/
false,
null,
false,
/*skip store*/false,
false
).get().get(keyValue(false));
}
/**
* @param tx Transaction.
* @param primaryNodeId Primary node ID.
* @param val New value.
* @param ver Version to use.
* @param dhtVer DHT version received from remote node.
* @param ttl Time to live.
* @param expireTime Expiration time.
* @param evt Event flag.
* @param topVer Topology version.
* @param subjId Subject ID.
* @return {@code True} if initial value was set.
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
public boolean loadedValue(@Nullable IgniteInternalTx tx,
UUID primaryNodeId,
CacheObject val,
GridCacheVersion ver,
GridCacheVersion dhtVer,
long ttl,
long expireTime,
boolean evt,
boolean keepBinary,
AffinityTopologyVersion topVer,
UUID subjId)
throws IgniteCheckedException, GridCacheEntryRemovedException {
assert dhtVer != null;
GridCacheVersion enqueueVer = null;
lockEntry();
try {
checkObsolete();
if (cctx.statisticsEnabled())
cctx.cache().metrics0().onRead(false);
boolean ret = false;
CacheObject old = this.val;
boolean hasVal = hasValueUnlocked();
if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0 || !valid(topVer)) {
primaryNode(primaryNodeId, topVer);
update(val, expireTime, ttl, ver, true);
// Special case for platform cache: start tracking near entry.
updatePlatformCache(val, topVer);
if (cctx.deferredDelete() && !isInternal()) {
boolean deleted = val == null;
if (deleted != deletedUnlocked()) {
deletedUnlocked(deleted);
if (deleted)
enqueueVer = ver;
}
}
this.dhtVer = dhtVer;
ret = true;
}
if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
cctx.events().addEvent(
partition(),
key,
tx,
null,
EVT_CACHE_OBJECT_READ,
val,
val != null,
old,
hasVal,
subjId,
null,
null,
keepBinary);
return ret;
}
finally {
unlockEntry();
if (enqueueVer != null)
cctx.onDeferredDelete(this, enqueueVer);
}
}
/** {@inheritDoc} */
@Override protected boolean storeValue(CacheObject val, long expireTime, GridCacheVersion ver) {
return false;
// No-op: queries are disabled for near cache.
}
/** {@inheritDoc} */
@Override protected void removeValue() {
// No-op.
}
/** {@inheritDoc} */
@Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime,
long updCntr, boolean primary) {
// No-op: queries are disabled for near cache.
}
/** {@inheritDoc} */
@Override protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr) {
return null;
}
/** {@inheritDoc} */
@Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) {
return null;
}
/** {@inheritDoc} */
@Override public GridCacheMvccCandidate addLocal(
long threadId,
GridCacheVersion ver,
AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
boolean implicitSingle,
boolean read) throws GridCacheEntryRemovedException {
return addNearLocal(
null,
threadId,
ver,
topVer,
timeout,
reenter,
tx,
implicitSingle,
read
);
}
/**
* Add near local candidate.
*
* @param dhtNodeId DHT node ID.
* @param threadId Owning thread ID.
* @param ver Lock version.
* @param topVer Topology version.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Transaction flag.
* @param implicitSingle Implicit flag.
* @param read Read lock flag.
* @return New candidate.
* @throws GridCacheEntryRemovedException If entry has been removed.
*/
@Nullable GridCacheMvccCandidate addNearLocal(
@Nullable UUID dhtNodeId,
long threadId,
GridCacheVersion ver,
AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
boolean implicitSingle,
boolean read)
throws GridCacheEntryRemovedException {
CacheLockCandidates prev;
CacheLockCandidates owner = null;
GridCacheMvccCandidate cand;
CacheObject val;
UUID locId = cctx.nodeId();
lockEntry();
try {
checkObsolete();
GridCacheMvcc mvcc = mvccExtras();
if (mvcc == null) {
mvcc = new GridCacheMvcc(cctx);
mvccExtras(mvcc);
}
GridCacheMvccCandidate c = mvcc.localCandidateByThreadOrVer(locId, threadId, ver);
if (c != null)
return reenter ? c.reenter() : null;
prev = mvcc.allOwners();
boolean emptyBefore = mvcc.isEmpty();
// Lock could not be acquired.
if (timeout < 0 && !emptyBefore)
return null;
// Local lock for near cache is a local lock.
cand = mvcc.addNearLocal(
this,
locId,
dhtNodeId,
threadId,
ver,
tx,
implicitSingle,
read);
cand.topologyVersion(topVer);
boolean emptyAfter = mvcc.isEmpty();
checkCallbacks(emptyBefore, emptyAfter);
val = this.val;
if (emptyAfter)
mvccExtras(null);
else
owner = mvcc.allOwners();
}
finally {
unlockEntry();
}
// This call must be outside of synchronization.
checkOwnerChanged(prev, owner, val);
return cand;
}
/**
* @param ver Version to set DHT node ID for.
* @param dhtNodeId DHT node ID.
* @return {@code true} if candidate was found.
* @throws GridCacheEntryRemovedException If entry is removed.
*/
@Nullable public GridCacheMvccCandidate dhtNodeId(GridCacheVersion ver, UUID dhtNodeId)
throws GridCacheEntryRemovedException {
lockEntry();
try {
checkObsolete();
GridCacheMvcc mvcc = mvccExtras();
GridCacheMvccCandidate cand = mvcc == null ? null : mvcc.candidate(ver);
if (cand == null)
return null;
cand.otherNodeId(dhtNodeId);
return cand;
}
finally {
unlockEntry();
}
}
/**
* Unlocks local lock.
*
* @return Removed candidate, or <tt>null</tt> if thread still holds the lock.
*/
@Nullable @Override public GridCacheMvccCandidate removeLock() {
CacheLockCandidates prev = null;
CacheLockCandidates owner = null;
CacheObject val;
UUID locId = cctx.nodeId();
GridCacheMvccCandidate cand = null;
lockEntry();
try {
GridCacheMvcc mvcc = mvccExtras();
if (mvcc != null) {
prev = mvcc.allOwners();
boolean emptyBefore = mvcc.isEmpty();
cand = mvcc.localCandidate(locId, Thread.currentThread().getId());
assert cand == null || cand.nearLocal();
if (cand != null && cand.owner()) {
// If a reentry, then release reentry. Otherwise, remove lock.
GridCacheMvccCandidate reentry = cand.unenter();
if (reentry != null) {
assert reentry.reentry();
return reentry;
}
mvcc.remove(cand.version());
owner = mvcc.allOwners();
}
else
return null;
boolean emptyAfter = mvcc.isEmpty();
checkCallbacks(emptyBefore, emptyAfter);
if (emptyAfter)
mvccExtras(null);
}
val = this.val;
}
finally {
unlockEntry();
}
assert cand != null;
assert owner != prev;
if (log.isDebugEnabled())
log.debug("Released local candidate from entry [owner=" + owner + ", prev=" + prev +
", entry=" + this + ']');
cctx.mvcc().removeExplicitLock(cand);
checkThreadChain(cand);
// This call must be outside of synchronization.
checkOwnerChanged(prev, owner, val);
return cand;
}
/** {@inheritDoc} */
@Override protected void onInvalidate() {
topVer = AffinityTopologyVersion.NONE;
dhtVer = null;
}
/**
* @throws GridCacheEntryRemovedException If entry was removed.
*/
void reserveEviction() throws GridCacheEntryRemovedException {
lockEntry();
try {
checkObsolete();
evictReservations++;
}
finally {
unlockEntry();
}
}
/**
*
*/
void releaseEviction() {
lockEntry();
try {
assert evictReservations > 0 : this;
assert !obsolete() : this;
evictReservations--;
}
finally {
unlockEntry();
}
}
/** {@inheritDoc} */
@Override protected boolean evictionDisabled() {
assert lockedByCurrentThread();
return evictReservations > 0;
}
/** {@inheritDoc} */
@Override public void onMarkedObsolete() {
// GridCacheMapEntry.onMarkedObsolete is called immediately after performing operation for any non-primary key.
updatePlatformCache(null, null);
}
/**
* @param nodeId Primary node ID.
* @param topVer Topology version.
*/
private void primaryNode(UUID nodeId, AffinityTopologyVersion topVer) {
assert lockedByCurrentThread();
assert nodeId != null;
ClusterNode primary = null;
try {
primary = cctx.affinity().primaryByPartition(part, topVer);
}
catch (IllegalStateException ignore) {
// Do not have affinity history.
}
if (primary == null || !nodeId.equals(primary.id())) {
this.topVer = AffinityTopologyVersion.NONE;
return;
}
if (topVer.compareTo(this.topVer) > 0)
this.topVer = topVer;
}
/** {@inheritDoc} */
@Override public String toString() {
return toStringWithTryLock(() -> S.toString(GridNearCacheEntry.class, this, "super", super.toString()));
}
}