blob: bbe7a8fb3436668d8e363c58c135f72f3efa715b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.local.atomic;
import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.LockedEntriesInfo;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.resource.GridResourceIoc;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
/**
* Non-transactional local cache.
*/
public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** */
private GridCachePreloader preldr;
/** Locked entries info for each thread. */
private final LockedEntriesInfo lockedEntriesInfo = new LockedEntriesInfo();
/**
* Empty constructor required by {@link Externalizable}.
*/
public GridLocalAtomicCache() {
// No-op.
}
/**
* @param ctx Cache context.
*/
public GridLocalAtomicCache(GridCacheContext<K, V> ctx) {
super(ctx);
preldr = new GridCachePreloaderAdapter(ctx.group());
}
/** {@inheritDoc} */
@Override protected void checkJta() throws IgniteCheckedException {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean isLocal() {
return true;
}
/** {@inheritDoc} */
@Override public GridCachePreloader preloader() {
return preldr;
}
/** {@inheritDoc} */
@Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
return (V)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
null,
expiryPerCall(),
true,
false,
filter,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
Boolean res = (Boolean)updateAllInternal(UPDATE,
Collections.singleton(key),
Collections.singleton(val),
null,
expiryPerCall(),
false,
false,
filter,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
assert res != null;
return res;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
return updateAllAsync0(F0.asMap(key, val),
null,
null,
true,
false,
filter);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
return updateAllAsync0(F0.asMap(key, val),
null,
null,
false,
false,
filter);
}
/** {@inheritDoc} */
@Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
updateAllInternal(UPDATE,
m.keySet(),
m.values(),
null,
expiryPerCall(),
false,
false,
null,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) {
return updateAllAsync0(m,
null,
null,
false,
false,
null).chain(RET2NULL);
}
/** {@inheritDoc} */
@Override protected V getAndRemove0(K key) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
return (V)updateAllInternal(DELETE,
Collections.singleton(key),
null,
null,
expiryPerCall(),
true,
false,
null,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) {
return removeAllAsync0(Collections.singletonList(key), true, false, null);
}
/** {@inheritDoc} */
@Override public void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
updateAllInternal(DELETE,
keys,
null,
null,
expiryPerCall(),
false,
false,
null,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) {
return removeAllAsync0(keys, false, false, null).chain(RET2NULL);
}
/** {@inheritDoc} */
@Override public boolean remove0(K key, final CacheEntryPredicate filter) throws IgniteCheckedException {
CacheOperationContext opCtx = ctx.operationContextPerCall();
Boolean rmv = (Boolean)updateAllInternal(DELETE,
Collections.singleton(key),
null,
null,
expiryPerCall(),
false,
false,
filter,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
assert rmv != null;
return rmv;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) {
return removeAllAsync0(Collections.singletonList(key), false, false, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() {
@Override public Void call() throws Exception {
removeAll();
return null;
}
});
}
/** {@inheritDoc} */
@Override protected V get(
final K key,
String taskName,
boolean deserializeBinary,
boolean needVer) throws IgniteCheckedException {
Map<K, V> m = getAllInternal(Collections.singleton(key),
ctx.readThrough(),
taskName,
deserializeBinary,
false,
needVer);
assert m.isEmpty() || m.size() == 1 : m.size();
return F.firstValue(m);
}
/** {@inheritDoc} */
@Override public final Map<K, V> getAll(
Collection<? extends K> keys,
boolean deserializeBinary,
boolean needVer,
boolean recovery,
ReadRepairStrategy readRepairStrategy) throws IgniteCheckedException {
A.notNull(keys, "keys");
String taskName = ctx.kernalContext().job().currentTaskName();
return getAllInternal(keys,
ctx.readThrough(),
taskName,
deserializeBinary,
false,
needVer);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
@Nullable final Collection<? extends K> keys,
final boolean forcePrimary,
boolean skipTx,
final String taskName,
final boolean deserializeBinary,
boolean recovery,
ReadRepairStrategy readRepairStrategy,
final boolean skipVals,
final boolean needVer
) {
A.notNull(keys, "keys");
final boolean storeEnabled = ctx.readThrough();
return asyncOp(new GridPlainCallable<Map<K, V>>() {
@Override public Map<K, V> call() throws Exception {
return getAllInternal(keys, storeEnabled, taskName, deserializeBinary, skipVals, needVer);
}
});
}
/**
* Entry point to all public API get methods.
*
* @param keys Keys to remove.
* @param storeEnabled Store enabled flag.
* @param taskName Task name.
* @param deserializeBinary Deserialize binary .
* @param skipVals Skip value flag.
* @param needVer Need version.
* @return Key-value map.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ConstantConditions")
private Map<K, V> getAllInternal(@Nullable Collection<? extends K> keys,
boolean storeEnabled,
String taskName,
boolean deserializeBinary,
boolean skipVals,
boolean needVer
) throws IgniteCheckedException {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
if (F.isEmpty(keys))
return Collections.emptyMap();
CacheOperationContext opCtx = ctx.operationContextPerCall();
Map<K, V> vals = U.newHashMap(keys.size());
warnIfUnordered(keys, BulkOperation.GET);
final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
boolean success = true;
boolean readNoEntry = ctx.readNoEntry(expiry, false);
final boolean evt = !skipVals;
ctx.shared().database().checkpointReadLock();
try {
for (K key : keys) {
if (key == null)
throw new NullPointerException("Null key.");
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
boolean skipEntry = readNoEntry;
if (readNoEntry) {
CacheDataRow row = ctx.offheap().read(ctx, cacheKey);
if (row != null) {
long expireTime = row.expireTime();
if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
ctx.addResult(vals,
cacheKey,
row.value(),
skipVals,
false,
deserializeBinary,
true,
null,
row.version(),
0,
0,
needVer,
null);
if (ctx.statisticsEnabled() && !skipVals)
metrics0().onRead(true);
if (evt) {
ctx.events().readEvent(cacheKey,
null,
null,
row.value(),
taskName,
!deserializeBinary);
}
}
else
skipEntry = false;
}
else
success = false;
}
if (!skipEntry) {
GridCacheEntryEx entry = null;
while (true) {
try {
entry = entryEx(cacheKey);
if (entry != null) {
CacheObject v;
if (needVer) {
EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*update-metrics*/false,
/*event*/evt,
null,
taskName,
expiry,
!deserializeBinary,
null);
if (res != null) {
ctx.addResult(
vals,
cacheKey,
res,
skipVals,
false,
deserializeBinary,
true,
needVer);
}
else
success = false;
}
else {
v = entry.innerGet(
null,
null,
/*read-through*/false,
/*update-metrics*/true,
/*event*/evt,
null,
taskName,
expiry,
!deserializeBinary);
if (v != null) {
ctx.addResult(vals,
cacheKey,
v,
skipVals,
false,
deserializeBinary,
true,
null,
0,
0,
null);
}
else
success = false;
}
}
break; // While.
}
catch (GridCacheEntryRemovedException ignored) {
// No-op, retry.
}
finally {
if (entry != null)
entry.touch();
}
if (!success && storeEnabled)
break;
}
}
if (!success) {
if (!storeEnabled && ctx.statisticsEnabled() && !skipVals)
metrics0().onRead(false);
}
}
}
finally {
ctx.shared().database().checkpointReadUnlock();
}
if (success || !storeEnabled)
return vals;
return getAllAsync(
keys,
null,
opCtx == null || !opCtx.skipStore(),
false,
taskName,
deserializeBinary,
opCtx != null && opCtx.recovery(),
null,
/*force primary*/false,
expiry,
skipVals,
needVer).get();
}
/** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
return invokeAsync(key, entryProcessor, args).get();
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
Object... args) throws IgniteCheckedException {
A.notNull(keys, "keys", entryProcessor, "entryProcessor");
warnIfUnordered(keys, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
@Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
CacheOperationContext opCtx = ctx.operationContextPerCall();
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
Map<K, EntryProcessorResult<T>> entryProcessorRes = (Map<K, EntryProcessorResult<T>>)updateAllInternal(
TRANSFORM,
invokeMap.keySet(),
invokeMap.values(),
args,
expiryPerCall(),
false,
false,
null,
ctx.writeThrough(),
ctx.readThrough(),
keepBinary);
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
return entryProcessorRes;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) throws EntryProcessorException {
A.notNull(key, "key", entryProcessor, "entryProcessor");
final boolean statsEnabled = ctx.statisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
Map<? extends K, EntryProcessor> invokeMap =
Collections.singletonMap(key, (EntryProcessor)entryProcessor);
IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
invokeMap,
args,
false,
false,
null);
return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut)
throws IgniteCheckedException {
Map<K, EntryProcessorResult<T>> resMap = fut.get();
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
if (resMap != null) {
assert resMap.isEmpty() || resMap.size() == 1 : resMap.size();
return resMap.isEmpty() ? new CacheInvokeResult<>() : resMap.values().iterator().next();
}
return new CacheInvokeResult<>();
}
});
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
Object... args) {
A.notNull(keys, "keys", entryProcessor, "entryProcessor");
warnIfUnordered(keys, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
@Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
IgniteInternalFuture fut = updateAllAsync0(null,
invokeMap,
args,
true,
false,
null);
if (statsEnabled)
fut.listen(new InvokeAllTimeStatClosure(metrics0(), start));
return fut;
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) throws IgniteCheckedException {
A.notNull(map, "map");
warnIfUnordered(map, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
CacheOperationContext opCtx = ctx.operationContextPerCall();
Map<K, EntryProcessorResult<T>> entryProcessorResult = (Map<K, EntryProcessorResult<T>>)updateAllInternal(
TRANSFORM,
map.keySet(),
map.values(),
args,
expiryPerCall(),
false,
false,
null,
ctx.writeThrough(),
ctx.readThrough(),
opCtx != null && opCtx.isKeepBinary());
if (statsEnabled)
metrics0().addInvokeTimeNanos(System.nanoTime() - start);
return entryProcessorResult;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) {
A.notNull(map, "map");
warnIfUnordered(map, BulkOperation.INVOKE);
final boolean statsEnabled = ctx.statisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
IgniteInternalFuture fut = updateAllAsync0(null,
map,
args,
true,
false,
null);
if (statsEnabled)
fut.listen(new InvokeAllTimeStatClosure(metrics0(), start));
return fut;
}
/**
* Entry point for public API update methods.
*
* @param map Put map. Either {@code map} or {@code invokeMap} should be passed.
* @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed.
* @param invokeArgs Optional arguments for EntryProcessor.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param filter Cache entry filter for atomic updates.
* @return Completion future.
*/
private IgniteInternalFuture updateAllAsync0(
@Nullable final Map<? extends K, ? extends V> map,
@Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
@Nullable final Object[] invokeArgs,
final boolean retval,
final boolean rawRetval,
@Nullable final CacheEntryPredicate filter
) {
final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
final Collection<? extends K> keys =
map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null;
final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null;
final boolean writeThrough = ctx.writeThrough();
final boolean readThrough = ctx.readThrough();
CacheOperationContext opCtx = ctx.operationContextPerCall();
final ExpiryPolicy expiry = expiryPerCall();
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
return asyncOp(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(op,
keys,
vals,
invokeArgs,
expiry,
retval,
rawRetval,
filter,
writeThrough,
readThrough,
keepBinary);
}
});
}
/**
* Entry point for public API remove methods.
*
* @param keys Keys to remove.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param filter Cache entry filter.
* @return Completion future.
*/
private IgniteInternalFuture removeAllAsync0(
@Nullable final Collection<? extends K> keys,
final boolean retval,
final boolean rawRetval,
@Nullable final CacheEntryPredicate filter
) {
final boolean writeThrough = ctx.writeThrough();
final boolean readThrough = ctx.readThrough();
final ExpiryPolicy expiryPlc = expiryPerCall();
CacheOperationContext opCtx = ctx.operationContextPerCall();
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
return asyncOp(new GridPlainCallable<Object>() {
@Override public Object call() throws Exception {
return updateAllInternal(DELETE,
keys,
null,
null,
expiryPlc,
retval,
rawRetval,
filter,
writeThrough,
readThrough,
keepBinary);
}
});
}
/**
* Entry point for all public update methods (put, remove, invoke).
*
* @param op Operation.
* @param keys Keys.
* @param vals Values.
* @param invokeArgs Optional arguments for EntryProcessor.
* @param expiryPlc Expiry policy.
* @param retval Return value required flag.
* @param rawRetval Return {@code GridCacheReturn} instance.
* @param filter Cache entry filter.
* @param writeThrough Write through.
* @param readThrough Read through.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private Object updateAllInternal(GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
@Nullable Object[] invokeArgs,
@Nullable ExpiryPolicy expiryPlc,
boolean retval,
boolean rawRetval,
CacheEntryPredicate filter,
boolean writeThrough,
boolean readThrough,
boolean keepBinary
) throws IgniteCheckedException {
if (op == DELETE)
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
else
ctx.checkSecurity(SecurityPermission.CACHE_PUT);
String taskName = ctx.kernalContext().job().currentTaskName();
GridCacheVersion ver = nextVersion();
CacheEntryPredicate[] filters = CU.filterArray(filter);
IgniteBiTuple<Boolean, ?> res = null;
CachePartialUpdateCheckedException err = null;
ctx.shared().database().checkpointReadLock();
try {
ctx.shared().database().ensureFreeSpace(ctx.dataRegion());
if (writeThrough && keys.size() > 1) {
return updateWithBatch(op,
keys,
vals,
invokeArgs,
expiryPlc,
ver,
filters,
keepBinary,
taskName);
}
Iterator<?> valsIter = vals != null ? vals.iterator() : null;
boolean intercept = ctx.config().getInterceptor() != null;
for (K key : keys) {
if (key == null)
throw new NullPointerException("Null key.");
Object val = valsIter != null ? valsIter.next() : null;
if (val == null && op != DELETE)
throw new NullPointerException("Null value.");
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
if (op == UPDATE) {
val = ctx.toCacheObject(val);
ctx.validateKeyAndValue(cacheKey, (CacheObject)val);
}
else if (op == TRANSFORM)
ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name());
while (true) {
GridCacheEntryEx entry = null;
try {
entry = entryEx(cacheKey);
GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
ver,
val == null ? DELETE : op,
val,
invokeArgs,
writeThrough,
readThrough,
retval,
keepBinary,
expiryPlc,
true,
true,
filters,
intercept,
taskName,
false);
if (op == TRANSFORM) {
if (t.get3() != null) {
Map<K, EntryProcessorResult> computedMap;
if (res == null) {
computedMap = U.newHashMap(keys.size());
res = new IgniteBiTuple<>(true, computedMap);
}
else
computedMap = (Map<K, EntryProcessorResult>)res.get2();
computedMap.put(key, t.get3());
}
}
else if (res == null)
res = new T2(t.get1(), t.get2());
break; // While.
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry while updating (will retry): " + key);
entry = null;
}
catch (IgniteCheckedException e) {
if (err == null)
err = partialUpdateException();
err.add(F.asList(key), e);
U.error(log, "Failed to update key : " + key, e);
break;
}
finally {
if (entry != null)
entry.touch();
}
}
}
}
finally {
ctx.shared().database().checkpointReadUnlock();
}
if (err != null)
throw err;
Object ret = res == null ? null : rawRetval ? new GridCacheReturn(
ctx,
true,
keepBinary,
U.deploymentClassLoader(ctx.kernalContext(), U.contextDeploymentClassLoaderId(ctx.kernalContext())),
res.get2(),
res.get1()
) : (retval || op == TRANSFORM) ? res.get2() : res.get1();
if (op == TRANSFORM && ret == null)
ret = Collections.emptyMap();
return ret;
}
/**
* Updates entries using batched write-through.
*
* @param op Operation.
* @param keys Keys.
* @param vals Values.
* @param invokeArgs Optional arguments for EntryProcessor.
* @param expiryPlc Expiry policy.
* @param ver Cache version.
* @param filter Optional filter.
* @param taskName Task name.
* @return Results map for invoke operation.
* @throws CachePartialUpdateCheckedException If update failed.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
private Map<K, EntryProcessorResult> updateWithBatch(
GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Iterable<?> vals,
@Nullable Object[] invokeArgs,
@Nullable ExpiryPolicy expiryPlc,
GridCacheVersion ver,
@Nullable CacheEntryPredicate[] filter,
boolean keepBinary,
String taskName
) throws IgniteCheckedException {
List<GridCacheEntryEx> locked = lockEntries(keys);
try {
int size = locked.size();
Map<KeyCacheObject, CacheObject> putMap = null;
Collection<KeyCacheObject> rmvKeys = null;
List<CacheObject> writeVals = null;
Map<K, EntryProcessorResult> invokeResMap =
op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
List<GridCacheEntryEx> filtered = new ArrayList<>(size);
CachePartialUpdateCheckedException err = null;
Iterator<?> valsIter = vals != null ? vals.iterator() : null;
boolean intercept = ctx.config().getInterceptor() != null;
for (int i = 0; i < size; i++) {
GridCacheEntryEx entry = locked.get(i);
Object val = valsIter != null ? valsIter.next() : null;
if (val == null && op != DELETE)
throw new NullPointerException("Null value.");
try {
try {
if (!ctx.isAllLocked(entry, filter)) {
if (log.isDebugEnabled())
log.debug("Entry did not pass the filter (will skip write) [entry=" + entry +
", filter=" + Arrays.toString(filter) + ']');
continue;
}
}
catch (IgniteCheckedException e) {
if (err == null)
err = partialUpdateException();
err.add(F.asList(entry.key()), e);
continue;
}
if (op == TRANSFORM) {
ctx.kernalContext().resource().inject(val,
GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR,
ctx.name());
EntryProcessor<Object, Object, Object> entryProcessor =
(EntryProcessor<Object, Object, Object>)val;
CacheObject old = entry.innerGet(
null,
null,
/*read-through*/true,
/*update-metrics*/true,
/*event*/true,
entryProcessor,
taskName,
null,
keepBinary);
Object oldVal = null;
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(entry.key(), old,
entry.version(), keepBinary, entry);
CacheObject updated;
Object updatedVal = null;
CacheInvokeResult invokeRes = null;
boolean validation = false;
IgniteThread.onEntryProcessorEntered(false);
try {
Object computed = entryProcessor.process(invokeEntry, invokeArgs);
updatedVal = ctx.unwrapTemporary(invokeEntry.getValue());
updated = ctx.toCacheObject(updatedVal);
if (computed != null)
invokeRes = CacheInvokeResult.fromResult(ctx.unwrapTemporary(computed));
if (invokeEntry.modified() && updated != null) {
validation = true;
ctx.validateKeyAndValue(entry.key(), updated);
}
else if (ctx.statisticsEnabled() && !invokeEntry.modified())
ctx.cache().metrics0().onReadOnlyInvoke(old != null);
}
catch (Exception e) {
invokeRes = CacheInvokeResult.fromError(e);
updated = old;
if (validation) {
invokeResMap.put((K)entry.key().value(ctx.cacheObjectContext(), false), invokeRes);
continue;
}
}
finally {
IgniteThread.onEntryProcessorLeft();
}
if (invokeRes != null)
invokeResMap.put((K)entry.key().value(ctx.cacheObjectContext(), false), invokeRes);
if (updated == null) {
if (intercept) {
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
.onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(),
old, oldVal, keepBinary));
if (ctx.cancelRemove(interceptorRes))
continue;
}
// Update previous batch.
if (putMap != null) {
err = updatePartialBatch(
filtered,
ver,
writeVals,
putMap,
null,
expiryPlc,
keepBinary,
err,
taskName,
true);
putMap = null;
writeVals = null;
filtered = new ArrayList<>();
}
// Start collecting new batch.
if (rmvKeys == null)
rmvKeys = new ArrayList<>(size);
rmvKeys.add(entry.key());
}
else {
if (intercept) {
Object interceptorVal = ctx.config().getInterceptor()
.onBeforePut(new CacheLazyEntry(ctx, entry.key(), invokeEntry.getKey(),
old, oldVal, keepBinary), updatedVal);
if (interceptorVal == null)
continue;
updated = ctx.toCacheObject(ctx.unwrapTemporary(interceptorVal));
}
// Update previous batch.
if (rmvKeys != null) {
err = updatePartialBatch(
filtered,
ver,
null,
null,
rmvKeys,
expiryPlc,
keepBinary,
err,
taskName,
true);
rmvKeys = null;
filtered = new ArrayList<>();
}
if (putMap == null) {
putMap = new LinkedHashMap<>(size, 1.0f);
writeVals = new ArrayList<>(size);
}
putMap.put(entry.key(), updated);
writeVals.add(updated);
}
}
else if (op == UPDATE) {
CacheObject cacheVal = ctx.toCacheObject(val);
if (intercept) {
CacheObject old = entry.innerGet(
null,
null,
/*read-through*/ctx.loadPreviousValue(),
/*update-metrics*/true,
/*event*/true,
null,
taskName,
null,
keepBinary);
Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(
ctx, entry.key(), old, keepBinary), val);
if (interceptorVal == null)
continue;
cacheVal = ctx.toCacheObject(ctx.unwrapTemporary(interceptorVal));
}
ctx.validateKeyAndValue(entry.key(), cacheVal);
if (putMap == null) {
putMap = new LinkedHashMap<>(size, 1.0f);
writeVals = new ArrayList<>(size);
}
putMap.put(entry.key(), cacheVal);
writeVals.add(cacheVal);
}
else {
assert op == DELETE;
if (intercept) {
CacheObject old = entry.innerGet(
null,
null,
/*read-through*/ctx.loadPreviousValue(),
/*update-metrics*/true,
/*event*/true,
null,
taskName,
null,
keepBinary);
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
.onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepBinary));
if (ctx.cancelRemove(interceptorRes))
continue;
}
if (rmvKeys == null)
rmvKeys = new ArrayList<>(size);
rmvKeys.add(entry.key());
}
filtered.add(entry);
}
catch (IgniteCheckedException e) {
if (err == null)
err = partialUpdateException();
err.add(F.asList(entry.key()), e);
}
catch (GridCacheEntryRemovedException ignore) {
assert false : "Entry cannot become obsolete while holding lock.";
}
}
// Store final batch.
if (putMap != null || rmvKeys != null) {
err = updatePartialBatch(
filtered,
ver,
writeVals,
putMap,
rmvKeys,
expiryPlc,
keepBinary,
err,
taskName,
op == TRANSFORM);
}
else
assert filtered.isEmpty();
if (err != null)
throw err;
return invokeResMap;
}
finally {
unlockEntries(locked);
}
}
/**
* @param entries Entries to update.
* @param ver Cache version.
* @param writeVals Cache values.
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
* @param expiryPlc Expiry policy.
* @param err Optional partial update exception.
* @param taskName Task name.
* @param transformed {@code True} if transform operation performed.
* @return Partial update exception.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
@Nullable private CachePartialUpdateCheckedException updatePartialBatch(
List<GridCacheEntryEx> entries,
final GridCacheVersion ver,
@Nullable List<CacheObject> writeVals,
@Nullable Map<KeyCacheObject, CacheObject> putMap,
@Nullable Collection<KeyCacheObject> rmvKeys,
@Nullable ExpiryPolicy expiryPlc,
boolean keepBinary,
@Nullable CachePartialUpdateCheckedException err,
String taskName,
boolean transformed) {
assert putMap == null ^ rmvKeys == null;
GridCacheOperation op;
CacheStorePartialUpdateException storeErr = null;
try {
if (putMap != null) {
try {
Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap,
new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() {
@Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) {
return F.t(val, ver);
}
});
ctx.store().putAll(null, view);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
}
op = UPDATE;
}
else {
try {
ctx.store().removeAll(null, rmvKeys);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
}
op = DELETE;
}
}
catch (IgniteCheckedException e) {
if (err == null)
err = partialUpdateException();
err.add(putMap != null ? putMap.keySet() : rmvKeys, e);
return err;
}
boolean intercept = ctx.config().getInterceptor() != null;
for (int i = 0; i < entries.size(); i++) {
GridCacheEntryEx entry = entries.get(i);
assert entry.lockedByCurrentThread();
if (entry.obsolete() ||
(storeErr != null && storeErr.failedKeys().contains(entry.key().value(ctx.cacheObjectContext(), false))))
continue;
try {
// We are holding java-level locks on entries at this point.
CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null;
assert writeVal != null || op == DELETE : "null write value found.";
GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
ver,
op,
writeVal,
null,
false,
false,
false,
keepBinary,
expiryPlc,
true,
true,
null,
false,
taskName,
transformed);
if (intercept) {
if (op == UPDATE)
ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(ctx, entry.key(), writeVal, keepBinary));
else
ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), t.get2(), keepBinary));
}
}
catch (GridCacheEntryRemovedException ignore) {
assert false : "Entry cannot become obsolete while holding lock.";
}
catch (IgniteCheckedException e) {
if (err == null)
err = partialUpdateException();
err.add(Collections.singleton(entry.key()), e);
}
}
return err;
}
/**
* Acquires java-level locks on cache entries.
*
* @param keys Keys to lock.
* @return Collection of locked entries.
*/
private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) {
GridCacheEntryEx[] locked = new GridCacheEntryEx[keys.size()];
boolean nullKeys = false;
while (true) {
int i = 0;
for (K key : keys) {
if (key == null) {
nullKeys = true;
break;
}
GridCacheEntryEx entry = entryEx(ctx.toCacheKeyObject(key));
locked[i++] = entry;
}
if (nullKeys)
break;
if (lockedEntriesInfo.tryLockEntries(locked))
return Arrays.asList(locked);
}
assert nullKeys;
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
for (GridCacheEntryEx entry : locked) {
if (entry != null)
entry.touch();
}
throw new NullPointerException("Null key.");
}
/**
* Releases java-level locks on cache entries.
*
* @param locked Locked entries.
*/
private void unlockEntries(Iterable<GridCacheEntryEx> locked) {
for (GridCacheEntryEx entry : locked)
entry.unlockEntry();
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
for (GridCacheEntryEx entry : locked)
entry.touch();
}
/**
* @return New partial update exception.
*/
private static CachePartialUpdateCheckedException partialUpdateException() {
return new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
boolean retval,
TransactionIsolation isolation,
boolean invalidate,
long createTtl,
long accessTtl) {
return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " +
"CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)"));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys,
long timeout) {
return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " +
"CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)"));
}
/** {@inheritDoc} */
@Override public void unlockAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException {
throw new UnsupportedOperationException("Locks are not supported for " +
"CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)");
}
/**
* @return Expiry policy.
*/
@Nullable private ExpiryPolicy expiryPerCall() {
CacheOperationContext opCtx = ctx.operationContextPerCall();
ExpiryPolicy expiry = opCtx != null ? opCtx.expiry() : null;
if (expiry == null)
expiry = ctx.expiry();
return expiry;
}
/**
* @param op Operation closure.
* @return Future.
*/
@SuppressWarnings("unchecked")
private IgniteInternalFuture asyncOp(final Callable<?> op) {
IgniteInternalFuture fail = asyncOpAcquire(/*retry*/false);
if (fail != null)
return fail;
IgniteInternalFuture f = ctx.closures().callLocalSafe(op);
f.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
asyncOpRelease(false);
}
});
return f;
}
/** {@inheritDoc} */
@Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) {
assert false : "Should not be called";
}
}