blob: 6bc86b9f6ff68b38ff27f83547f7c02d21d49d3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
/**
* Distributed cache implementation.
*/
public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
/**
* Empty constructor required by {@link Externalizable}.
*/
protected GridDistributedCacheAdapter() {
// No-op.
}
/**
* @param ctx Cache registry.
*/
protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx) {
super(ctx);
}
/**
* @param ctx Cache context.
* @param map Cache map.
*/
protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
super(ctx, map);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> txLockAsync(
Collection<KeyCacheObject> keys,
long timeout,
IgniteTxLocalEx tx,
boolean isRead,
boolean retval,
TransactionIsolation isolation,
boolean isInvalidate,
long createTtl,
long accessTtl
) {
assert tx != null;
return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, createTtl, accessTtl);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout) {
IgniteTxLocalEx tx = ctx.tm().userTx();
// Return value flag is true because we choose to bring values for explicit locks.
return lockAllAsync(ctx.cacheKeysView(keys),
timeout,
tx,
false,
false,
/*retval*/true,
null,
-1L,
-1L);
}
/**
* @param keys Keys to lock.
* @param timeout Timeout.
* @param tx Transaction
* @param isInvalidate Invalidation flag.
* @param isRead Indicates whether value is read or written.
* @param retval Flag to return value.
* @param isolation Transaction isolation.
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
* @return Future for locks.
*/
protected abstract IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
boolean isInvalidate,
boolean isRead,
boolean retval,
@Nullable TransactionIsolation isolation,
long createTtl,
long accessTtl);
/**
* @param key Key to remove.
* @param ver Version to remove.
*/
public void removeVersionedEntry(KeyCacheObject key, GridCacheVersion ver) {
GridCacheEntryEx entry = peekEx(key);
if (entry == null)
return;
if (entry.markObsoleteVersion(ver))
removeEntry(entry);
}
/** {@inheritDoc} */
@Override public void removeAll() throws IgniteCheckedException {
try {
AffinityTopologyVersion topVer;
boolean retry;
CacheOperationContext opCtx = ctx.operationContextPerCall();
boolean skipStore = opCtx != null && opCtx.skipStore();
boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
do {
retry = false;
topVer = ctx.affinity().affinityTopologyVersion();
// Send job to all data nodes.
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
retry = !ctx.kernalContext().task().execute(
new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null).get();
}
}
while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]");
}
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
GridFutureAdapter<Void> opFut = new GridFutureAdapter<>();
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
CacheOperationContext opCtx = ctx.operationContextPerCall();
removeAllAsync(opFut, topVer, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary());
return opFut;
}
/**
* @param opFut Future.
* @param topVer Topology version.
* @param skipStore Skip store flag.
*/
private void removeAllAsync(
final GridFutureAdapter<Void> opFut,
final AffinityTopologyVersion topVer,
final boolean skipStore,
final boolean keepBinary
) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null);
rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
try {
boolean retry = !fut.get();
AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion();
if (topVer0.equals(topVer) && !retry)
opFut.onDone();
else
removeAllAsync(opFut, topVer0, skipStore, keepBinary);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]");
opFut.onDone();
}
catch (IgniteCheckedException e) {
opFut.onDone(e);
}
catch (Error e) {
opFut.onDone(e);
throw e;
}
}
});
}
else
opFut.onDone();
}
/** {@inheritDoc} */
@Override public long localSizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException {
PeekModes modes = parsePeekModes(peekModes, true);
long size = 0;
if (modes.near)
size += nearSize();
// Swap and offheap are disabled for near cache.
if (modes.primary || modes.backup) {
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
IgniteCacheOffheapManager offheap = ctx.offheap();
if (modes.offheap)
size += offheap.cacheEntriesCount(ctx.cacheId(), modes.primary, modes.backup, topVer);
else if (modes.heap) {
for (GridDhtLocalPartition locPart : ctx.topology().currentLocalPartitions()) {
if ((modes.primary && locPart.primary(topVer)) || (modes.backup && locPart.backup(topVer)))
size += locPart.publicSize(ctx.cacheId());
}
}
}
return size;
}
/** {@inheritDoc} */
@Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
PeekModes modes = parsePeekModes(peekModes, true);
long size = 0;
if (modes.near)
size += nearSize();
// Swap and offheap are disabled for near cache.
if (modes.offheap) {
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
IgniteCacheOffheapManager offheap = ctx.offheap();
if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary ||
ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup)
size += offheap.cacheEntriesCount(ctx.cacheId(), part);
}
return size;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDistributedCacheAdapter.class, this, "super", super.toString());
}
/**
* Remove task.
*/
@GridInternal
private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Cache name. */
private final String cacheName;
/** Affinity topology version. */
private final AffinityTopologyVersion topVer;
/** Skip store flag. */
private final boolean skipStore;
/** Keep binary flag. */
private final boolean keepBinary;
/**
* @param cacheName Cache name.
* @param topVer Affinity topology version.
* @param skipStore Skip store flag.
*/
public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore, boolean keepBinary) {
this.cacheName = cacheName;
this.topVer = topVer;
this.skipStore = skipStore;
this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
@Nullable Object arg) throws IgniteException {
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (ClusterNode node : subgrid)
jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore, keepBinary), node);
return jobs;
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
IgniteException e = res.getException();
if (e != null) {
if (e instanceof ClusterTopologyException)
return ComputeJobResultPolicy.WAIT;
throw new IgniteException("Remote job threw exception.", e);
}
return ComputeJobResultPolicy.WAIT;
}
/** {@inheritDoc} */
@Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
for (ComputeJobResult locRes : results) {
if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData()))
return false;
}
return true;
}
}
/**
* Internal job which performs remove all primary key mappings
* operation on a cache with the given name.
*/
@GridInternal
public static class GlobalRemoveAllJob<K, V> extends TopologyVersionAwareJob {
/** */
private static final long serialVersionUID = 0L;
/** Skip store flag. */
private final boolean skipStore;
/** Keep binary flag. */
private final boolean keepBinary;
/** Future that needed for applying exactly one {@link GlobalRemoveAllJob} */
private transient GridFutureAdapter<Boolean> locFut;
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param skipStore Skip store flag.
* @param keepBinary Keep binary flag.
*/
private GlobalRemoveAllJob(
String cacheName,
@NotNull AffinityTopologyVersion topVer,
boolean skipStore,
boolean keepBinary
) {
super(cacheName, topVer);
this.skipStore = skipStore;
this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) {
if (locFut == null)
locFut = new GridFutureAdapter<>();
if (locFut.isDone()) {
if (locFut.isFailed())
throw U.convertException((IgniteCheckedException)locFut.error());
try {
return locFut.get();
}
catch (IgniteCheckedException ignored) {
// Should be never thrown.
}
}
GridCacheAdapter<K, V> cache = ((IgniteEx)ignite).context().cache().internalCache(cacheName);
if (cache == null)
return completeWithResult(true);
final GridCacheContext<K, V> ctx = cache.context();
ctx.gate().enter();
final AtomicReference<IgniteInternalFuture<Boolean>> refToLastFut = ctx.lastRemoveAllJobFut();
refToLastFut.set(locFut);
try {
if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
return completeWithResult(false); // Ignore this remove request because remove request will be sent again.
GridDhtCacheAdapter<K, V> dht;
GridNearCacheAdapter<K, V> near = null;
if (cache instanceof GridNearCacheAdapter) {
near = ((GridNearCacheAdapter<K, V>) cache);
dht = near.dht();
}
else
dht = (GridDhtCacheAdapter<K, V>) cache;
try (DataStreamerImpl dataLdr = (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
dataLdr.maxRemapCount(0);
dataLdr.skipStore(skipStore);
dataLdr.keepBinary(keepBinary);
dataLdr.receiver(DataStreamerCacheUpdaters.batched());
for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
//Synchronization is needed for case when job hasn't completed removing for one partition yet
// while concurrent job changed last job future and started removing
synchronized (refToLastFut) {
IgniteInternalFuture<Boolean> lastFut = ctx.lastRemoveAllJobFut().get();
if (lastFut != locFut) {
lastFut.listen((IgniteInClosure<IgniteInternalFuture<Boolean>>)fut -> {
if (lastFut.error() != null)
locFut.onDone(lastFut.error());
else {
try {
completeWithResult(fut.get());
}
catch (IgniteCheckedException ignored) {
// Should be never thrown.
}
}
jobCtx.callcc();
});
return jobCtx.holdcc();
}
GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
return completeWithResult(false);
try {
GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
if (iter != null) {
try {
while (iter.hasNext())
dataLdr.removeDataInternal(iter.next());
}
finally {
iter.close();
}
}
}
finally {
locPart.release();
}
}
}
}
if (near != null) {
GridCacheVersion obsoleteVer = ctx.versions().next();
for (GridCacheEntryEx e : near.allEntries()) {
if (!e.valid(topVer) && e.markObsolete(obsoleteVer))
near.removeEntry(e);
}
}
return completeWithResult(true);
}
catch (IgniteCheckedException e) {
locFut.onDone(e);
throw U.convertException(e);
}
finally {
ctx.gate().leave();
}
}
/**
* Completes future with provided result.
*/
private boolean completeWithResult(boolean b) {
locFut.onDone(b);
return b;
}
}
}