blob: 62abde197dfa21aa17e1aeb7a74c69a4be7f27db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridCircularBuffer;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
/**
* Near cache for atomic cache.
*/
public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** */
private GridDhtCacheAdapter<K, V> dht;
/** Remove queue. */
private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
/**
* Empty constructor required for {@link Externalizable}.
*/
public GridNearAtomicCache() {
// No-op.
}
/**
* @param ctx Context.
*/
public GridNearAtomicCache(GridCacheContext<K, V> ctx) {
super(ctx);
int size = CU.isSystemCache(ctx.name()) ? 100 :
Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 1_000_000);
rmvQueue = new GridCircularBuffer<>(U.ceilPow2(size / 10));
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
super.start();
ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
@Override public void apply(UUID nodeId, GridNearGetResponse res) {
processGetResponse(nodeId, res);
}
});
}
/**
* @param dht DHT cache.
*/
public void dht(GridDhtAtomicCache<K, V> dht) {
this.dht = dht;
}
/** {@inheritDoc} */
@Override public GridDhtCacheAdapter<K, V> dht() {
return dht;
}
/**
* @param req Update request.
* @param res Update response.
*/
public void processNearAtomicUpdateResponse(
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res
) {
if (F.size(res.failedKeys()) == req.size())
return;
/*
* Choose value to be stored in near cache: first check key is not in failed and not in skipped list,
* then check if value was generated on primary node, if not then use value sent in request.
*/
Collection<KeyCacheObject> failed = res.failedKeys();
List<Integer> nearValsIdxs = res.nearValuesIndexes();
List<Integer> skipped = res.skippedIndexes();
GridCacheVersion ver = res.nearVersion();
assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
int nearValIdx = 0;
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
for (int i = 0; i < req.size(); i++) {
if (F.contains(skipped, i))
continue;
KeyCacheObject key = req.key(i);
if (F.contains(failed, key))
continue;
if (ctx.affinity().partitionBelongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup.
GridCacheEntryEx entry = peekEx(key);
if (entry != null && entry.markObsolete(ver))
removeEntry(entry);
continue;
}
CacheObject val = null;
if (F.contains(nearValsIdxs, i)) {
val = res.nearValue(nearValIdx);
nearValIdx++;
}
else {
assert req.operation() != TRANSFORM;
if (req.operation() != DELETE)
val = req.value(i);
}
long ttl = res.nearTtl(i);
long expireTime = res.nearExpireTime(i);
if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE)
expireTime = CU.toExpireTime(ttl);
try {
processNearAtomicUpdateResponse(ver,
key,
val,
ttl,
expireTime,
req.keepBinary(),
req.nodeId(),
req.subjectId(),
taskName,
req.operation() == TRANSFORM);
}
catch (IgniteCheckedException e) {
res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e));
}
}
}
/**
* @param ver Version.
* @param key Key.
* @param val Value.
* @param ttl TTL.
* @param expireTime Expire time.
* @param nodeId Node ID.
* @param subjId Subject ID.
* @param taskName Task name.
* @param transformedValue {@code True} if transformed value.
* @throws IgniteCheckedException If failed.
*/
private void processNearAtomicUpdateResponse(
GridCacheVersion ver,
KeyCacheObject key,
@Nullable CacheObject val,
long ttl,
long expireTime,
boolean keepBinary,
UUID nodeId,
UUID subjId,
String taskName,
boolean transformedValue) throws IgniteCheckedException {
try {
while (true) {
GridCacheEntryEx entry = null;
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
try {
entry = entryEx(key, topVer);
GridCacheOperation op = val != null ? UPDATE : DELETE;
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
nodeId,
nodeId,
op,
val,
null,
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
keepBinary,
/*expiry policy*/null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
/*check version*/true,
topVer,
CU.empty0(),
DR_NONE,
ttl,
expireTime,
null,
false,
false,
subjId,
taskName,
null,
null,
null,
transformedValue);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
break; // While.
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry while updating near cache value (will retry): " + key);
entry = null;
}
finally {
if (entry != null)
entry.touch();
}
}
}
catch (GridDhtInvalidPartitionException ignored) {
// Ignore.
}
}
/**
* @param nodeId Sender node ID.
* @param req Dht atomic update request.
* @param res Dht atomic update response.
* @return Evicted near keys (if any).
*/
@Nullable public List<KeyCacheObject> processDhtAtomicUpdateRequest(
UUID nodeId,
GridDhtAtomicAbstractUpdateRequest req,
GridDhtAtomicNearResponse res
) {
GridCacheVersion ver = req.writeVersion();
assert ver != null;
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
List<KeyCacheObject> nearEvicted = null;
for (int i = 0; i < req.nearSize(); i++) {
KeyCacheObject key = req.nearKey(i);
try {
while (true) {
try {
GridCacheEntryEx entry = peekEx(key);
if (entry == null) {
if (nearEvicted == null)
nearEvicted = new ArrayList<>();
nearEvicted.add(key);
break;
}
CacheObject val = req.nearValue(i);
EntryProcessor<Object, Object, Object> entryProcessor = req.nearEntryProcessor(i);
GridCacheOperation op = entryProcessor != null ? TRANSFORM :
(val != null) ? UPDATE : DELETE;
long ttl = req.nearTtl(i);
long expireTime = req.nearExpireTime(i);
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
nodeId,
nodeId,
op,
op == TRANSFORM ? entryProcessor : val,
op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
req.keepBinary(),
null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
/*check version*/!req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
DR_NONE,
ttl,
expireTime,
null,
false,
intercept,
req.subjectId(),
taskName,
null,
null,
null,
false);
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
break;
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry while updating near value (will retry): " + key);
}
}
}
catch (IgniteCheckedException e) {
res.addFailedKey(key, new IgniteCheckedException("Failed to update near cache key: " + key, e));
}
}
for (int i = 0; i < req.obsoleteNearKeysSize(); i++) {
KeyCacheObject key = req.obsoleteNearKey(i);
GridCacheEntryEx entry = peekEx(key);
if (entry != null && entry.markObsolete(ver))
removeEntry(entry);
}
return nearEvicted;
}
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<Map<K, V>> getAllAsync(
@Nullable Collection<? extends K> keys,
boolean forcePrimary,
boolean skipTx,
@Nullable UUID subjId,
String taskName,
boolean deserializeBinary,
boolean recovery,
boolean skipVals,
boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
if (keyCheck)
validateCacheKeys(keys);
CacheOperationContext opCtx = ctx.operationContextPerCall();
subjId = ctx.subjectIdPerCall(subjId, opCtx);
return loadAsync(null,
ctx.cacheKeysView(keys),
forcePrimary,
subjId,
taskName,
deserializeBinary,
recovery,
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
opCtx != null && opCtx.skipStore(),
needVer);
}
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
return dht.getAndPut(key, val, filter);
}
/** {@inheritDoc} */
@Override public boolean put(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException {
return dht.put(key, val, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
return dht.getAndPutAsync0(key, val, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
return dht.putAsync0(key, val, filter);
}
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> m)
throws IgniteCheckedException {
dht.putAll(m);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m) {
return dht.putAllAsync(m);
}
/** {@inheritDoc} */
@Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> drMap) throws IgniteCheckedException {
dht.putAllConflict(drMap);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> drMap)
throws IgniteCheckedException {
return dht.putAllConflictAsync(drMap);
}
/** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
return dht.invoke(key, entryProcessor, args);
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
return dht.invokeAll(keys, entryProcessor, args);
}
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) {
return dht.invokeAllAsync(map, args);
}
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws EntryProcessorException {
return dht.invokeAsync(key, entryProcessor, args);
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) throws IgniteCheckedException {
return dht.invokeAllAsync(map, args).get();
}
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
return dht.invokeAllAsync(keys, entryProcessor, args);
}
/** {@inheritDoc} */
@Override public boolean remove(K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
return dht.remove(key, filter);
}
/** {@inheritDoc} */
@Override public V getAndRemove(K key) throws IgniteCheckedException {
return dht.getAndRemove(key);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getAndRemoveAsync(K key) {
return dht.getAndRemoveAsync(key);
}
/** {@inheritDoc} */
@Override public void removeAll(Collection<? extends K> keys)
throws IgniteCheckedException {
dht.removeAll(keys);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys) {
return dht.removeAllAsync(keys);
}
/** {@inheritDoc} */
@Override public boolean remove(K key) throws IgniteCheckedException {
return dht.remove(key);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> removeAsync(K key, @Nullable CacheEntryPredicate filter) {
return dht.removeAsync(key, filter);
}
/** {@inheritDoc} */
@Override public void removeAll() throws IgniteCheckedException {
dht.removeAll();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
return dht.removeAllAsync();
}
/** {@inheritDoc} */
@Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> drMap)
throws IgniteCheckedException {
dht.removeAllConflict(drMap);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap)
throws IgniteCheckedException {
return dht.removeAllConflictAsync(drMap);
}
/** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
boolean isRead,
boolean retval,
@Nullable TransactionIsolation isolation,
long createTtl,
long accessTtl) {
return dht.lockAllAsync(null, timeout);
}
/** {@inheritDoc} */
@Override public void unlockAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
dht.unlockAll(keys);
}
/** {@inheritDoc} */
@Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) {
assert entry.isNear();
try {
T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(entry.key(), ver));
if (evicted != null)
removeVersionedEntry(evicted.get1(), evicted.get2());
}
catch (InterruptedException ignore) {
if (log.isDebugEnabled())
log.debug("Failed to enqueue deleted entry [key=" + entry.key() + ", ver=" + ver + ']');
Thread.currentThread().interrupt();
}
}
}