blob: d2ea1068096b44589ffd30816c5758974d5aaea4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
* Eagerly removes expired entries from cache when
* {@link CacheConfiguration#isEagerTtl()} flag is set.
*/
public class GridCacheTtlManager extends GridCacheManagerAdapter {
/** @see IgniteSystemProperties#IGNITE_UNWIND_THROTTLING_TIMEOUT */
public static final long DFLT_UNWIND_THROTTLING_TIMEOUT = 500L;
/**
* Throttling timeout in millis which avoid excessive PendingTree access on unwind
* if there is nothing to clean yet.
*/
private final long unwindThrottlingTimeout = Long.getLong(
IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, DFLT_UNWIND_THROTTLING_TIMEOUT);
/** Entries pending removal. This collection tracks entries for near cache only. */
private GridConcurrentSkipListSetEx pendingEntries;
/** Indicates that */
protected volatile boolean hasPendingEntries;
/** Timestamp when next clean try will be allowed. Used for throttling on per-cache basis. */
protected volatile long nextCleanTime;
/** See {@link CacheConfiguration#isEagerTtl()}. */
private volatile boolean eagerTtlEnabled;
/** */
private GridCacheContext dhtCtx;
/** */
private final IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> expireC =
new IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion>() {
@Override public void applyx(GridCacheEntryEx entry, GridCacheVersion obsoleteVer) {
boolean touch = !entry.isNear();
while (true) {
try {
if (log.isTraceEnabled())
log.trace("Trying to remove expired entry from cache: " + entry);
if (entry.onTtlExpired(obsoleteVer))
touch = false;
break;
}
catch (GridCacheEntryRemovedException ignore) {
entry = entry.context().cache().entryEx(entry.key());
touch = true;
}
}
if (touch)
entry.touch();
}
};
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
dhtCtx = cctx.isNear() ? cctx.near().dht().context() : cctx;
boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
!cctx.config().isEagerTtl() ||
CU.isUtilityCache(cctx.name()) ||
cctx.dataStructuresCache() ||
(cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
if (cleanupDisabled)
return;
eagerTtlEnabled = true;
cctx.shared().ttl().register(this);
pendingEntries = (!cctx.isLocal() && cctx.config().getNearConfiguration() != null) ? new GridConcurrentSkipListSetEx() : null;
}
/**
* @return {@code True} if eager ttl is enabled for cache.
*/
public boolean eagerTtlEnabled() {
return eagerTtlEnabled;
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
if (pendingEntries != null)
pendingEntries.clear();
}
/**
* Unregister this TTL manager of cache from periodical check on expired entries.
*/
public void unregister() {
// Ignoring attempt to unregister manager that has never been started.
if (!starting.get())
return;
cctx.shared().ttl().unregister(this);
}
/**
* Adds tracked entry to ttl processor.
*
* @param entry Entry to add.
*/
void addTrackedEntry(GridNearCacheEntry entry) {
assert entry.lockedByCurrentThread();
EntryWrapper e = new EntryWrapper(entry);
pendingEntries.add(e);
}
/**
* @param entry Entry to remove.
*/
void removeTrackedEntry(GridNearCacheEntry entry) {
assert entry.lockedByCurrentThread();
pendingEntries.remove(new EntryWrapper(entry));
}
/**
* @return The size of pending entries.
* @throws IgniteCheckedException If failed.
*/
public long pendingSize() throws IgniteCheckedException {
return (pendingEntries != null ? pendingEntries.sizex() : 0) + cctx.offheap().expiredSize();
}
/**
* Updates the flag {@code hasPendingEntries} with the given value.
*
* @param update {@code true} if the underlying pending tree has entries with expire policy enabled.
*/
public void hasPendingEntries(boolean update) {
hasPendingEntries = update;
}
/**
* @return {@code true} if the underlying pending tree has entries with expire policy enabled.
*/
public boolean hasPendingEntries() {
return hasPendingEntries;
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
try {
X.println(">>>");
X.println(">>> TTL processor memory stats [igniteInstanceName=" + cctx.igniteInstanceName() +
", cache=" + cctx.name() + ']');
X.println(">>> pendingEntriesSize: " + pendingSize());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to print statistics: " + e, e);
}
}
/**
* Processes specified amount of expired entries.
*
* @param amount Limit of processed entries by single call, {@code -1} for no limit.
* @return {@code True} if unprocessed expired entries remains.
*/
public boolean expire(int amount) {
// TTL manager is not initialized or eagerTtl disabled for cache.
if (!eagerTtlEnabled)
return false;
assert cctx != null;
long now = U.currentTimeMillis();
try {
if (pendingEntries != null) {
GridNearCacheAdapter nearCache = cctx.near();
GridCacheVersion obsoleteVer = null;
int limit = (-1 != amount) ? amount : pendingEntries.sizex();
for (int cnt = limit; cnt > 0; cnt--) {
EntryWrapper e = pendingEntries.firstx();
if (e == null || e.expireTime > now)
break; // All expired entries are processed.
if (pendingEntries.remove(e)) {
if (obsoleteVer == null)
obsoleteVer = cctx.cache().nextVersion();
GridNearCacheEntry nearEntry = nearCache.peekExx(e.key);
if (nearEntry != null)
expireC.apply(nearEntry, obsoleteVer);
}
}
}
if (!cctx.affinityNode())
return false; /* Pending tree never contains entries for that cache */
if (!hasPendingEntries || nextCleanTime > U.currentTimeMillis())
return false;
boolean more = cctx.offheap().expire(dhtCtx, expireC, amount);
if (more)
return true;
// There is nothing to clean, so the next clean up can be postponed.
nextCleanTime = U.currentTimeMillis() + unwindThrottlingTimeout;
if (amount != -1 && pendingEntries != null) {
EntryWrapper e = pendingEntries.firstx();
return e != null && e.expireTime <= now;
}
}
catch (GridDhtInvalidPartitionException e) {
if (log.isDebugEnabled())
log.debug("Partition became invalid during rebalancing (will ignore): " + e.partition());
return false;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process entry expiration: " + e, e);
}
catch (IgniteException e) {
if (e.hasCause(NodeStoppingException.class)) {
if (log.isDebugEnabled())
log.debug("Failed to expire because node is stopped: " + e);
}
else
throw e;
}
return false;
}
/**
* @param cctx1 First cache context.
* @param key1 Left key to compare.
* @param cctx2 Second cache context.
* @param key2 Right key to compare.
* @return Comparison result.
*/
private static int compareKeys(GridCacheContext cctx1, CacheObject key1, GridCacheContext cctx2, CacheObject key2) {
int key1Hash = key1.hashCode();
int key2Hash = key2.hashCode();
int res = Integer.compare(key1Hash, key2Hash);
if (res == 0) {
key1 = (CacheObject)cctx1.unwrapTemporary(key1);
key2 = (CacheObject)cctx2.unwrapTemporary(key2);
try {
byte[] key1ValBytes = key1.valueBytes(cctx1.cacheObjectContext());
byte[] key2ValBytes = key2.valueBytes(cctx2.cacheObjectContext());
// Must not do fair array comparison.
res = Integer.compare(key1ValBytes.length, key2ValBytes.length);
if (res == 0) {
for (int i = 0; i < key1ValBytes.length; i++) {
res = Byte.compare(key1ValBytes[i], key2ValBytes[i]);
if (res != 0)
break;
}
}
if (res == 0)
res = Boolean.compare(cctx1.isNear(), cctx2.isNear());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
return res;
}
/**
* Entry wrapper.
*/
private static class EntryWrapper implements Comparable<EntryWrapper> {
/** Entry expire time. */
private final long expireTime;
/** Cache Object Context */
private final GridCacheContext ctx;
/** Cache Object Key */
private final KeyCacheObject key;
/**
* @param entry Cache entry to create wrapper for.
*/
private EntryWrapper(GridCacheMapEntry entry) {
expireTime = entry.expireTimeUnlocked();
assert expireTime != 0;
this.ctx = entry.context();
this.key = entry.key();
}
/** {@inheritDoc} */
@Override public int compareTo(EntryWrapper o) {
int res = Long.compare(expireTime, o.expireTime);
if (res == 0)
res = compareKeys(ctx, key, o.ctx, o.key);
return res;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof EntryWrapper))
return false;
EntryWrapper that = (EntryWrapper)o;
return expireTime == that.expireTime && compareKeys(ctx, key, that.ctx, that.key) == 0;
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = (int)(expireTime ^ (expireTime >>> 32));
res = 31 * res + key.hashCode();
return res;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(EntryWrapper.class, this);
}
}
/**
* Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition:
* <ul>
* <li>{@code #add()}</li>
* <li>{@code #remove()}</li>
* <li>{@code #pollFirst()}</li>
* <ul/>
*/
private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> {
/** */
private static final long serialVersionUID = 0L;
/** Size. */
private final LongAdder size = new LongAdder();
/**
* @return Size based on performed operations.
*/
public int sizex() {
return size.intValue();
}
/** {@inheritDoc} */
@Override public boolean add(EntryWrapper e) {
boolean res = super.add(e);
if (res)
size.increment();
return res;
}
/** {@inheritDoc} */
@Override public boolean remove(Object o) {
boolean res = super.remove(o);
if (res)
size.decrement();
return res;
}
/** {@inheritDoc} */
@Nullable @Override public EntryWrapper pollFirst() {
EntryWrapper e = super.pollFirst();
if (e != null)
size.decrement();
return e;
}
}
}