blob: 4b9114f587d2a163fc48e86d4830fd481bf5033b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastructures;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.systemview.walker.AtomicLongViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.AtomicReferenceViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.AtomicSequenceViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.AtomicStampedViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.CountDownLatchViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.QueueViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.ReentrantLockViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SemaphoreViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SetViewWalker;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.lang.gridfunc.PredicateCollectionView;
import org.apache.ignite.internal.util.lang.gridfunc.TransformCollectionView;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.GPR;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicLongView;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicReferenceView;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicSequenceView;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicStampedView;
import org.apache.ignite.spi.systemview.view.datastructures.CountDownLatchView;
import org.apache.ignite.spi.systemview.view.datastructures.QueueView;
import org.apache.ignite.spi.systemview.view.datastructures.ReentrantLockView;
import org.apache.ignite.spi.systemview.view.datastructures.SemaphoreView;
import org.apache.ignite.spi.systemview.view.datastructures.SetView;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_LONG;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_REF;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_SEQ;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_STAMPED;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.COUNT_DOWN_LATCH;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.QUEUE;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.REENTRANT_LOCK;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.SET;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Manager of data structures.
*/
public final class DataStructuresProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport {
/** DataRegionConfiguration name reserved for volatile caches. */
public static final String VOLATILE_DATA_REGION_NAME = "volatileDsMemPlc";
/** */
public static final String DEFAULT_VOLATILE_DS_GROUP_NAME = "default-volatile-ds-group";
/** */
public static final String DEFAULT_DS_GROUP_NAME = "default-ds-group";
/** */
private static final String DS_CACHE_NAME_PREFIX = "datastructures_";
/** Atomics system cache name. */
public static final String ATOMICS_CACHE_NAME = "ignite-sys-atomic-cache";
/** */
public static final String QUEUES_VIEW = metricName("ds", "queues");
/** */
public static final String SETS_VIEW = metricName("ds", "sets");
/** */
public static final String LOCKS_VIEW = metricName("ds", "reentrantlocks");
/** */
public static final String SEMAPHORES_VIEW = metricName("ds", "semaphores");
/** */
public static final String LATCHES_VIEW = metricName("ds", "countdownlatches");
/** */
public static final String STAMPED_VIEW = metricName("ds", "atomicstamped");
/** */
public static final String REFERENCES_VIEW = metricName("ds", "atomicreferences");
/** */
public static final String LONGS_VIEW = metricName("ds", "atomiclongs");
/** */
public static final String SEQUENCES_VIEW = metricName("ds", "atomicsequences");
/** */
private static final String QUEUES_VIEW_DESC = "Data structure queues";
/** */
private static final String SETS_VIEW_DESC = "Data structure sets";
/** */
private static final String LOCKS_VIEW_DESC = "Data structure reentrant locks";
/** */
private static final String SEMAPHORES_VIEW_DESC = "Data structure semaphores";
/** */
private static final String LATCHES_VIEW_DESC = "Data structure count down latches";
/** */
private static final String STAMPED_VIEW_DESC = "Data structure atomic stamped";
/** */
private static final String REFERENCES_VIEW_DESC = "Data structure atomic references";
/** */
private static final String LONGS_VIEW_DESC = "Data structure atomic longs";
/** */
private static final String SEQUENCES_VIEW_DESC = "Data structure atomic sequences";
/** Non collocated IgniteSet will use separate cache if all nodes in cluster is not older then specified version. */
private static final IgniteProductVersion SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE =
IgniteProductVersion.fromString("2.7.0");
/** Initial capacity. */
private static final int INITIAL_CAPACITY = 10;
/** Initialization latch. */
private volatile CountDownLatch initLatch = new CountDownLatch(1);
/** Initialization failed flag. */
private boolean initFailed;
/** Internal storage of all dataStructures items (sequence, atomic long etc.). */
private final ConcurrentMap<GridCacheInternalKey, GridCacheRemovable> dsMap;
/** Atomic data structures configuration. */
private final AtomicConfiguration dfltAtomicCfg;
/** Map of continuous query IDs. */
private final ConcurrentHashMap<Integer, UUID> qryIdMap = new ConcurrentHashMap<>();
/** Listener. */
private final GridLocalEventListener lsnr = new GridLocalEventListener() {
@Override public void onEvent(final Event evt) {
// This may require cache operation to execute,
// therefore cannot use event notification thread.
ctx.closure().callLocalSafe(
new GridPlainCallable<Object>() {
@Override public Object call() {
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID leftNodeId = discoEvt.eventNode().id();
for (GridCacheRemovable ds : dsMap.values()) {
if (ds instanceof GridCacheSemaphoreEx)
((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId);
else if (ds instanceof GridCacheLockEx)
((GridCacheLockEx)ds).onNodeRemoved(leftNodeId);
}
return null;
}
},
false);
}
};
/**
* @param ctx Context.
*/
public DataStructuresProcessor(GridKernalContext ctx) {
super(ctx);
dsMap = new ConcurrentHashMap<>(INITIAL_CAPACITY);
dfltAtomicCfg = ctx.config().getAtomicConfiguration();
}
/** {@inheritDoc} */
@Override public void start() {
ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) {
registerSystemViews();
if (!active)
return;
onKernalStart0();
}
/**
*
*/
public void onBeforeActivate() {
CountDownLatch latch0 = initLatch;
if (latch0 == null || latch0.getCount() == 0)
initLatch = new CountDownLatch(1);
}
/**
*
*/
private void onKernalStart0() {
initLatch.countDown();
}
/**
* @param cctx Cache context.
* @throws IgniteCheckedException If failed.
*/
private void startQuery(GridCacheContext cctx) throws IgniteCheckedException {
if (!qryIdMap.containsKey(cctx.cacheId())) {
synchronized (this) {
if (!qryIdMap.containsKey(cctx.cacheId())) {
qryIdMap.put(cctx.cacheId(),
cctx.continuousQueries().executeInternalQuery(
new DataStructuresEntryListener(),
new DataStructuresEntryFilter(),
cctx.isReplicated() && cctx.affinityNode(),
false,
false,
true
));
}
}
}
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
super.onKernalStop(cancel);
for (GridCacheRemovable ds : dsMap.values()) {
if (ds instanceof GridCacheSemaphoreEx)
((GridCacheSemaphoreEx)ds).stop();
if (ds instanceof GridCacheLockEx)
((GridCacheLockEx)ds).onStop();
}
CountDownLatch init0 = initLatch;
if (init0 != null && init0.getCount() > 0) {
initFailed = true;
init0.countDown();
initLatch = null;
}
Iterator<Map.Entry<Integer, UUID>> iter = qryIdMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Integer, UUID> e = iter.next();
iter.remove();
GridCacheContext cctx = ctx.cache().context().cacheContext(e.getKey());
cctx.continuousQueries().cancelInternalQuery(e.getValue());
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext ctx) {
if (log.isDebugEnabled())
log.debug("Activating data structure processor [nodeId=" + ctx.localNodeId() +
" topVer=" + ctx.discovery().topologyVersionEx() + " ]");
initFailed = false;
qryIdMap.clear();
ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
restoreStructuresState(ctx);
onKernalStart0();
}
/** Register system views. */
private void registerSystemViews() {
ctx.systemView().registerView(
SEQUENCES_VIEW,
SEQUENCES_VIEW_DESC,
new AtomicSequenceViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicSequence),
AtomicSequenceView::new
);
ctx.systemView().registerView(
LONGS_VIEW,
LONGS_VIEW_DESC,
new AtomicLongViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicLong),
AtomicLongView::new
);
ctx.systemView().registerView(
REFERENCES_VIEW,
REFERENCES_VIEW_DESC,
new AtomicReferenceViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicReference),
AtomicReferenceView::new
);
ctx.systemView().registerView(
STAMPED_VIEW,
STAMPED_VIEW_DESC,
new AtomicStampedViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicStamped),
AtomicStampedView::new
);
ctx.systemView().registerView(
LATCHES_VIEW,
LATCHES_VIEW_DESC,
new CountDownLatchViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteCountDownLatch),
CountDownLatchView::new
);
ctx.systemView().registerView(
SEMAPHORES_VIEW,
SEMAPHORES_VIEW_DESC,
new SemaphoreViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteSemaphore),
SemaphoreView::new
);
ctx.systemView().registerView(
LOCKS_VIEW,
LOCKS_VIEW_DESC,
new ReentrantLockViewWalker(),
new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteLock),
ReentrantLockView::new
);
ctx.systemView().registerInnerCollectionView(
QUEUES_VIEW,
QUEUES_VIEW_DESC,
new QueueViewWalker(),
new TransformCollectionView<>(
ctx.cache().cacheDescriptors().values(),
desc -> ctx.cache().cache(desc.cacheName()).context().dataStructures(),
desc -> desc.cacheType() == CacheType.DATA_STRUCTURES),
cctx -> cctx.queues().values(),
(cctx, queue) -> new QueueView(queue)
);
ctx.systemView().registerInnerCollectionView(
SETS_VIEW,
SETS_VIEW_DESC,
new SetViewWalker(),
F.viewReadOnly(
ctx.cache().cacheDescriptors().values(),
desc -> ctx.cache().cache(desc.cacheName()).context().dataStructures(),
desc -> desc.cacheType() == CacheType.DATA_STRUCTURES),
cctx -> cctx.sets().values(),
(cctx, set) -> new SetView(set)
);
}
/**
* @param ctx Context.
*/
public void restoreStructuresState(GridKernalContext ctx) {
onKernalStart0();
try {
for (GridCacheRemovable v : dsMap.values()) {
if (v instanceof IgniteChangeGlobalStateSupport)
((IgniteChangeGlobalStateSupport)v).onActivate(ctx);
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed restore data structures state", e);
}
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext ctx) {
if (log.isDebugEnabled())
log.debug("DeActivate data structure processor [nodeId=" + ctx.localNodeId() +
", topVer=" + ctx.discovery().topologyVersionEx() + "]");
ctx.event().removeLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
onKernalStop(false);
initLatch = null;
for (GridCacheRemovable v : dsMap.values()) {
if (v instanceof IgniteChangeGlobalStateSupport)
((IgniteChangeGlobalStateSupport)v).onDeActivate(ctx);
}
}
/**
* @param key Key.
* @param obj Object.
*/
void onRemoved(GridCacheInternalKey key, GridCacheRemovable obj) {
dsMap.remove(key, obj);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
UUID nodeId = ctx.localNodeId();
for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
GridCacheRemovable obj = e.getValue();
if (clusterRestarted) {
obj.onRemoved();
dsMap.remove(e.getKey(), obj);
}
else
obj.needCheckNotRemoved();
if (obj instanceof GridCacheLockEx)
((GridCacheLockEx)obj).onReconnected(nodeId);
}
for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
cctx.dataStructures().onReconnected(clusterRestarted);
return null;
}
/**
* @param cacheName Cache name.
* @return {@code True} if cache with such name is used to store data structures.
*/
public static boolean isDataStructureCache(String cacheName) {
return cacheName != null && (cacheName.startsWith(ATOMICS_CACHE_NAME) ||
cacheName.startsWith(DS_CACHE_NAME_PREFIX) ||
cacheName.equals(DEFAULT_DS_GROUP_NAME) ||
cacheName.equals(DEFAULT_VOLATILE_DS_GROUP_NAME));
}
/**
* @param grpName Group name.
* @return {@code True} if group name is reserved to store data structures.
*/
public static boolean isReservedGroup(@Nullable String grpName) {
return grpName != null &&
(DEFAULT_DS_GROUP_NAME.equals(grpName) ||
grpName.startsWith(DEFAULT_VOLATILE_DS_GROUP_NAME));
}
/**
* Gets a sequence from cache or creates one if it's not cached.
*
* @param name Sequence name.
* @param cfg Configuration.
* @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored.
* @param create If {@code true} sequence will be created in case it is not in cache.
* @return Sequence.
* @throws IgniteCheckedException If loading failed.
*/
public final IgniteAtomicSequence sequence(final String name,
@Nullable final AtomicConfiguration cfg,
final long initVal,
final boolean create
) throws IgniteCheckedException {
return getAtomic(new AtomicAccessor<GridCacheAtomicSequenceEx>() {
@Override public T2<GridCacheAtomicSequenceEx, AtomicDataStructureValue> get(GridCacheInternalKey key,
AtomicDataStructureValue val, IgniteInternalCache cache) throws IgniteCheckedException {
GridCacheAtomicSequenceValue seqVal = cast(val, GridCacheAtomicSequenceValue.class);
// Check that sequence hasn't been created in other thread yet.
GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), GridCacheAtomicSequenceEx.class);
if (seq != null) {
assert seqVal != null;
return new T2<>(seq, null);
}
if (seqVal == null && !create)
return null;
AtomicConfiguration cfg0 = cfg != null ? cfg : dfltAtomicCfg;
// We should use offset because we already reserved left side of range.
long off = cfg0.getAtomicSequenceReserveSize() > 1 ? cfg0.getAtomicSequenceReserveSize() - 1 : 1;
long upBound;
long locCntr;
if (seqVal == null) {
locCntr = initVal;
upBound = locCntr + off;
// Global counter must be more than reserved region.
seqVal = new GridCacheAtomicSequenceValue(upBound + 1);
}
else {
locCntr = seqVal.get();
upBound = locCntr + off;
// Global counter must be more than reserved region.
seqVal.set(upBound + 1);
}
// Only one thread can be in the transaction scope and create sequence.
seq = new GridCacheAtomicSequenceImpl(name,
key,
cache,
cfg0.getAtomicSequenceReserveSize(),
locCntr,
upBound);
return new T2<GridCacheAtomicSequenceEx, AtomicDataStructureValue>(seq, seqVal);
}
}, cfg, name, DataStructureType.ATOMIC_SEQ, create, GridCacheAtomicSequenceEx.class);
}
/**
* Removes sequence from cache.
*
* @param name Sequence name.
* @param grpName Group name.
* @throws IgniteCheckedException If removing failed.
*/
final void removeSequence(final String name, final String grpName) throws IgniteCheckedException {
removeDataStructure(null, name, grpName, ATOMIC_SEQ, null);
}
/**
* Gets an atomic long from cache or creates one if it's not cached.
*
* @param name Name of atomic long.
* @param cfg Configuration.
* @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal}
* will be ignored.
* @param create If {@code true} atomic long will be created in case it is not in cache.
* @return Atomic long.
* @throws IgniteCheckedException If loading failed.
*/
public final IgniteAtomicLong atomicLong(final String name,
@Nullable AtomicConfiguration cfg,
final long initVal,
final boolean create) throws IgniteCheckedException {
return getAtomic(new AtomicAccessor<GridCacheAtomicLongEx>() {
@Override public T2<GridCacheAtomicLongEx, AtomicDataStructureValue> get(
GridCacheInternalKey key,
AtomicDataStructureValue val,
IgniteInternalCache cache
) throws IgniteCheckedException {
// Check that atomic long hasn't been created in other thread yet.
GridCacheAtomicLongEx a = cast(dsMap.get(key), GridCacheAtomicLongEx.class);
if (a != null) {
assert val != null;
return new T2<>(a, null);
}
if (val == null && !create)
return null;
GridCacheAtomicLongValue retVal = (val == null ? new GridCacheAtomicLongValue(initVal) : null);
a = new GridCacheAtomicLongImpl(name, key, cache);
return new T2<GridCacheAtomicLongEx, AtomicDataStructureValue>(a, retVal);
}
}, cfg, name, ATOMIC_LONG, create, GridCacheAtomicLongEx.class);
}
/**
* @param c Closure creating data structure instance.
* @param cfg Optional custom configuration or {@code null} to use default one.
* @param name Data structure name.
* @param type Data structure type.
* @param create Create flag.
* @param cls Expected data structure class.
* @return Data structure instance.
* @throws IgniteCheckedException If failed.
*/
@Nullable private <T extends GridCacheRemovable> T getAtomic(final AtomicAccessor<T> c,
@Nullable AtomicConfiguration cfg,
final String name,
final DataStructureType type,
final boolean create,
Class<? extends T> cls
) throws IgniteCheckedException {
A.notNull(name, "name");
awaitInitialization();
if (cfg == null) {
checkAtomicsConfiguration();
cfg = dfltAtomicCfg;
}
String dataRegionName = null;
final String grpName;
if (type.isVolatile()) {
String volatileGrpName = DEFAULT_VOLATILE_DS_GROUP_NAME;
dataRegionName = VOLATILE_DATA_REGION_NAME;
volatileGrpName += "@" + dataRegionName;
grpName = volatileGrpName;
}
else if (cfg.getGroupName() != null)
grpName = cfg.getGroupName();
else
grpName = DEFAULT_DS_GROUP_NAME;
String cacheName = ATOMICS_CACHE_NAME + "@" + grpName;
IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> cache0 = ctx.cache().cache(cacheName);
if (cache0 == null) {
if (!create && ctx.cache().cacheDescriptor(cacheName) == null)
return null;
ctx.cache().dynamicStartCache(cacheConfiguration(cfg, cacheName, grpName, dataRegionName),
cacheName,
null,
CacheType.DATA_STRUCTURES,
false,
false,
true,
true).get();
cache0 = ctx.cache().cache(cacheName);
assert cache0 != null;
}
final IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> cache = cache0;
startQuery(cache.context());
final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name, grpName);
// Check type of structure received by key from local cache.
T dataStructure = cast(dsMap.get(key), cls);
if (dataStructure != null)
return dataStructure;
return retryTopologySafe(new IgniteOutClosureX<T>() {
@Override public T applyx() throws IgniteCheckedException {
cache.context().gate().enter();
try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
AtomicDataStructureValue val = cache.get(key);
if (isObsolete(val))
val = null;
if (val == null && !create)
return null;
if (val != null) {
if (val.type() != type)
throw new IgniteCheckedException("Another data structure with the same name already created " +
"[name=" + name +
", newType=" + type +
", existingType=" + val.type() + ']');
}
T2<T, ? extends AtomicDataStructureValue> ret;
try {
ret = c.get(key, val, cache);
dsMap.put(key, ret.get1());
if (ret.get2() != null)
cache.put(key, ret.get2());
tx.commit();
}
catch (Error | Exception e) {
dsMap.remove(key);
U.error(log, "Failed to make datastructure: " + name, e);
throw e;
}
return ret.get1();
}
finally {
cache.context().gate().leave();
}
}
});
}
/**
* @param val Value.
* @return {@code True} if value is obsolete.
*/
private boolean isObsolete(AtomicDataStructureValue val) {
return !(val == null || !(val instanceof VolatileAtomicDataStructureValue)) &&
((VolatileAtomicDataStructureValue)val).gridStartTime() != ctx.discovery().gridStartTime();
}
/**
* Removes atomic long from cache.
*
* @param name Atomic long name.
* @param grpName Group name.
* @throws IgniteCheckedException If removing failed.
*/
final void removeAtomicLong(final String name, @Nullable final String grpName) throws IgniteCheckedException {
removeDataStructure(null, name, grpName, ATOMIC_LONG, null);
}
/**
* @param pred Remove predicate.
* @param name Data structure name.
* @param grpName Group name.
* @param type Data structure type.
* @param afterRmv Optional closure to run after data structure removed.
* @throws IgniteCheckedException If failed.
*/
private <T> void removeDataStructure(
@Nullable final IgnitePredicateX<AtomicDataStructureValue> pred,
final String name,
String grpName,
final DataStructureType type,
@Nullable final IgniteInClosureX<T> afterRmv
) throws IgniteCheckedException {
assert name != null;
assert grpName != null;
assert type != null;
awaitInitialization();
final String cacheName = ATOMICS_CACHE_NAME + "@" + grpName;
final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name, grpName);
retryTopologySafe(new IgniteOutClosureX<Object>() {
@Override public Object applyx() throws IgniteCheckedException {
IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> cache = ctx.cache().cache(cacheName);
if (cache != null && cache.context().gate().enterIfNotStopped()) {
boolean isInterrupted = Thread.interrupted();
try {
while (true) {
try {
try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
AtomicDataStructureValue val = cache.get(key);
if (val == null)
return null;
if (val.type() != type)
throw new IgniteCheckedException("Data structure has different type " +
"[name=" + name +
", expectedType=" + type +
", actualType=" + val.type() + ']');
if (pred == null || pred.applyx(val)) {
cache.remove(key);
tx.commit();
if (afterRmv != null)
afterRmv.applyx(null);
}
}
break;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, IgniteInterruptedCheckedException.class, InterruptedException.class))
isInterrupted = Thread.interrupted();
else
throw e;
}
}
}
finally {
cache.context().gate().leave();
if (isInterrupted)
Thread.currentThread().interrupt();
}
}
return null;
}
});
}
/**
* Would suspend calls for this cache if it is atomics cache.
* @param cacheName To suspend.
*/
public void suspend(String cacheName) {
for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName();
if (cacheName0.equals(cacheName))
e.getValue().suspend();
}
}
/**
* Would return this cache to normal work if it was suspened (and if it is atomics cache).
* @param cacheName To restart.
*/
public void restart(String cacheName, IgniteInternalCache cache) {
for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName();
if (cacheName0.equals(cacheName)) {
if (cache != null)
e.getValue().restart(cache);
else {
e.getValue().onRemoved();
dsMap.remove(e.getKey(), e.getValue());
}
}
}
}
/**
* Gets an atomic reference from cache or creates one if it's not cached.
*
* @param name Name of atomic reference.
* @param cfg Configuration.
* @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal}
* will be ignored.
* @param create If {@code true} atomic reference will be created in case it is not in cache.
* @return Atomic reference.
* @throws IgniteCheckedException If loading failed.
*/
@SuppressWarnings("unchecked")
public final <T> IgniteAtomicReference<T> atomicReference(final String name,
@Nullable AtomicConfiguration cfg,
final T initVal,
final boolean create
) throws IgniteCheckedException {
return getAtomic(new AtomicAccessor<GridCacheAtomicReferenceEx>() {
@Override public T2<GridCacheAtomicReferenceEx, AtomicDataStructureValue> get(
GridCacheInternalKey key,
AtomicDataStructureValue val,
IgniteInternalCache cache
) throws IgniteCheckedException {
// Check that atomic reference hasn't been created in other thread yet.
GridCacheAtomicReferenceEx ref = cast(dsMap.get(key),
GridCacheAtomicReferenceEx.class);
if (ref != null) {
assert val != null;
return new T2<>(ref, null);
}
if (val == null && !create)
return null;
AtomicDataStructureValue retVal = (val == null ? new GridCacheAtomicReferenceValue<>(initVal) : null);
ref = new GridCacheAtomicReferenceImpl(name, key, cache);
return new T2<>(ref, retVal);
}
}, cfg, name, ATOMIC_REF, create, GridCacheAtomicReferenceEx.class);
}
/**
* Removes atomic reference from cache.
*
* @param name Atomic reference name.
* @param grpName Group name.
* @throws IgniteCheckedException If removing failed.
*/
final void removeAtomicReference(final String name, @Nullable final String grpName) throws IgniteCheckedException {
removeDataStructure(null, name, grpName, ATOMIC_REF, null);
}
/**
* Gets an atomic stamped from cache or creates one if it's not cached.
*
* @param name Name of atomic stamped.
* @param cfg Configuration.
* @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal}
* will be ignored.
* @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp}
* will be ignored.
* @param create If {@code true} atomic stamped will be created in case it is not in cache.
* @return Atomic stamped.
* @throws IgniteCheckedException If loading failed.
*/
@SuppressWarnings("unchecked")
public final <T, S> IgniteAtomicStamped<T, S> atomicStamped(final String name, @Nullable AtomicConfiguration cfg,
final T initVal, final S initStamp, final boolean create) throws IgniteCheckedException {
return getAtomic(new AtomicAccessor<GridCacheAtomicStampedEx>() {
@Override public T2<GridCacheAtomicStampedEx, AtomicDataStructureValue> get(
GridCacheInternalKey key,
AtomicDataStructureValue val,
IgniteInternalCache cache
) throws IgniteCheckedException {
// Check that atomic stamped hasn't been created in other thread yet.
GridCacheAtomicStampedEx stmp = cast(dsMap.get(key),
GridCacheAtomicStampedEx.class);
if (stmp != null) {
assert val != null;
return new T2(stmp, null);
}
if (val == null && !create)
return null;
AtomicDataStructureValue retVal = (val == null ? new GridCacheAtomicStampedValue(initVal, initStamp) : null);
stmp = new GridCacheAtomicStampedImpl(name, key, cache);
return new T2<>(stmp, retVal);
}
}, cfg, name, ATOMIC_STAMPED, create, GridCacheAtomicStampedEx.class);
}
/**
* Removes atomic stamped from cache.
*
* @param name Atomic stamped name.
* @param grpName Group name.
* @throws IgniteCheckedException If removing failed.
*/
final void removeAtomicStamped(final String name, final String grpName) throws IgniteCheckedException {
removeDataStructure(null, name, grpName, ATOMIC_STAMPED, null);
}
/**
* Gets a queue from cache or creates one if it's not cached.
*
* @param name Name of queue.
* @param grpName Group name. If present, will override groupName from configuration.
* @param cap Max size of queue.
* @param cfg Non-null queue configuration if new queue should be created.
* @return Instance of queue.
* @throws IgniteCheckedException If failed.
*/
public final <T> IgniteQueue<T> queue(final String name, @Nullable final String grpName, int cap,
@Nullable final CollectionConfiguration cfg)
throws IgniteCheckedException {
A.notNull(name, "name");
if (cfg != null) {
if (cap <= 0)
cap = Integer.MAX_VALUE;
}
final int cap0 = cap;
final boolean create = cfg != null;
return getCollection(new IgniteClosureX<GridCacheContext, IgniteQueue<T>>() {
@Override public IgniteQueue<T> applyx(GridCacheContext ctx) throws IgniteCheckedException {
return ctx.dataStructures().queue(name, cap0, isCollocated(cfg), create);
}
}, cfg, name, grpName, QUEUE, create, false);
}
/**
* Non-collocated mode only makes sense for and is only supported for PARTITIONED caches, so
* collocated mode should be enabled for non-partitioned cache by default.
*
* @param cfg Collection configuration.
* @return {@code True} If collocated mode should be enabled.
*/
private boolean isCollocated(CollectionConfiguration cfg) {
return cfg != null && (cfg.isCollocated() || cfg.getCacheMode() != PARTITIONED);
}
/**
* @param cfg Atomic configuration.
* @param name Cache name.
* @param grpName Group name.
* @param dataRegionName Name of data region for this cache.
*
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(AtomicConfiguration cfg, String name, String grpName,
String dataRegionName) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName(name);
ccfg.setGroupName(grpName);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setRebalanceMode(SYNC);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setCacheMode(cfg.getCacheMode());
ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
ccfg.setAffinity(cfg.getAffinity());
ccfg.setDataRegionName(dataRegionName);
if (cfg.getCacheMode() == PARTITIONED)
ccfg.setBackups(cfg.getBackups());
return ccfg;
}
/**
* @param cfg Collection configuration.
* @param name Cache name.
* @param grpName Group name.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(CollectionConfiguration cfg, String name, String grpName) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName(name);
ccfg.setGroupName(grpName);
ccfg.setBackups(cfg.getBackups());
ccfg.setCacheMode(cfg.getCacheMode());
ccfg.setAtomicityMode(cfg.getAtomicityMode());
ccfg.setNodeFilter(cfg.getNodeFilter());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setRebalanceMode(SYNC);
return ccfg;
}
/**
* @param cfg Collection configuration.
* @param name Cache name.
* @param grpName Group name.
* @return Meta cache configuration.
*/
private CacheConfiguration metaCacheConfiguration(CollectionConfiguration cfg, String name, String grpName) {
CacheConfiguration ccfg = cacheConfiguration(cfg, name, grpName);
ccfg.setAtomicityMode(TRANSACTIONAL);
return ccfg;
}
/**
* Get compatible with collection configuration data structure cache.
*
* @param cfg Collection configuration.
* @param grpName Group name.
* @param dsType Data structure type.
* @param dsName Data structure name.
* @param separated Separated cache flag.
* @return Data structure cache.
* @throws IgniteCheckedException If failed.
*/
private IgniteInternalCache compatibleCache(CollectionConfiguration cfg,
String grpName,
DataStructureType dsType,
String dsName,
boolean separated
) throws IgniteCheckedException {
String cacheName = DS_CACHE_NAME_PREFIX + cfg.getAtomicityMode() + "_" + cfg.getCacheMode() + "_" +
cfg.getBackups() + "@" + grpName;
IgniteInternalCache cache = ctx.cache().cache(cacheName);
if (separated && (cache == null || !cache.containsKey(new GridCacheSetHeaderKey(dsName)))) {
cacheName += "#" + dsType.name() + "_" + dsName;
cache = ctx.cache().cache(cacheName);
}
if (cache == null) {
ctx.cache().dynamicStartCache(cacheConfiguration(cfg, cacheName, grpName),
cacheName,
null,
CacheType.DATA_STRUCTURES,
false,
false,
true,
true).get();
}
else {
IgnitePredicate<ClusterNode> cacheNodeFilter = cache.context().group().nodeFilter();
String clsName1 = cacheNodeFilter != null ? cacheNodeFilter.getClass().getName() :
CacheConfiguration.IgniteAllNodesPredicate.class.getName();
String clsName2 = cfg.getNodeFilter() != null ? cfg.getNodeFilter().getClass().getName() :
CacheConfiguration.IgniteAllNodesPredicate.class.getName();
if (!clsName1.equals(clsName2))
throw new IgniteCheckedException("Could not add collection to group " + grpName +
" because of different node filters [existing=" + clsName1 + ", new=" + clsName2 + "]");
}
cache = ctx.cache().getOrStartCache(cacheName);
assert cache != null;
return cache;
}
/**
* @param name Queue name.
* @param cctx Queue cache context.
* @throws IgniteCheckedException If failed.
*/
public void removeQueue(final String name, final GridCacheContext cctx) throws IgniteCheckedException {
assert name != null;
assert cctx != null;
CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() {
@Override public void applyx(GridCacheQueueHeader hdr) throws IgniteCheckedException {
hdr = (GridCacheQueueHeader)cctx.cache().withNoRetries().getAndRemove(new GridCacheQueueHeaderKey(name));
if (hdr == null || hdr.empty())
return;
GridCacheQueueAdapter.removeKeys(cctx.cache(),
hdr.id(),
name,
hdr.collocated(),
hdr.head(),
hdr.tail(),
0);
}
};
removeDataStructure(null, name, cctx.group().name(), QUEUE, afterRmv);
}
/**
* @param c Closure creating collection.
* @param cfg Configuration.
* @param name Collection name.
* @param grpName Cache group name.
* @param type Data structure type.
* @param create Create flag.
* @param separated Separated cache flag.
* @return Collection instance.
* @throws IgniteCheckedException If failed.
*/
@Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c,
@Nullable CollectionConfiguration cfg,
String name,
@Nullable String grpName,
final DataStructureType type,
boolean create,
boolean separated
) throws IgniteCheckedException {
awaitInitialization();
assert name != null;
assert type.isCollection() : type;
assert !create || cfg != null;
if (grpName == null) {
if (cfg != null && cfg.getGroupName() != null)
grpName = cfg.getGroupName();
else
grpName = DEFAULT_DS_GROUP_NAME;
}
final String metaCacheName = ATOMICS_CACHE_NAME + "@" + grpName;
IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> metaCache0 = ctx.cache().cache(metaCacheName);
if (metaCache0 == null) {
CacheConfiguration ccfg = null;
if (!create) {
DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(metaCacheName);
if (desc == null)
return null;
}
else
ccfg = metaCacheConfiguration(cfg, metaCacheName, grpName);
ctx.cache().dynamicStartCache(ccfg,
metaCacheName,
null,
CacheType.DATA_STRUCTURES,
false,
false,
true,
true).get();
metaCache0 = ctx.cache().cache(metaCacheName);
assert metaCache0 != null;
}
final IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> metaCache = metaCache0;
AtomicDataStructureValue oldVal;
final IgniteInternalCache cache;
if (create) {
cache = compatibleCache(cfg, grpName, type, name, separated);
DistributedCollectionMetadata newVal = new DistributedCollectionMetadata(type, cfg, cache.name());
oldVal = metaCache.getAndPutIfAbsent(new GridCacheInternalKeyImpl(name, grpName), newVal);
}
else {
oldVal = metaCache.get(new GridCacheInternalKeyImpl(name, grpName));
if (oldVal == null)
return null;
else if (!(oldVal instanceof DistributedCollectionMetadata))
throw new IgniteCheckedException("Another data structure with the same name already created " +
"[name=" + name +
", newType=" + type +
", existingType=" + oldVal.type() + ']');
cache = ctx.cache().getOrStartCache(((DistributedCollectionMetadata)oldVal).cacheName());
if (cache == null)
return null;
}
if (oldVal != null) {
if (oldVal.type() != type)
throw new IgniteCheckedException("Another data structure with the same name already created " +
"[name=" + name +
", newType=" + type +
", existingType=" + oldVal.type() + ']');
assert oldVal instanceof DistributedCollectionMetadata;
if (cfg != null && ((DistributedCollectionMetadata)oldVal).configuration().isCollocated() != cfg.isCollocated()) {
throw new IgniteCheckedException("Another collection with the same name but different " +
"configuration already created [name=" + name +
", newCollocated=" + cfg.isCollocated() +
", existingCollocated=" + !cfg.isCollocated() + ']');
}
}
return retryTopologySafe(new IgniteOutClosureX<T>() {
@Override public T applyx() throws IgniteCheckedException {
return c.applyx(cache.context());
}
});
}
/**
* Awaits for processor initialization.
*/
private void awaitInitialization() {
CountDownLatch latch0 = initLatch;
if (latch0 == null)
throw new IllegalStateException("Ignite cluster is not active");
if (latch0.getCount() > 0) {
try {
U.await(latch0);
if (initFailed)
throw new IllegalStateException("Failed to initialize data structures processor.");
}
catch (IgniteInterruptedCheckedException e) {
throw new IllegalStateException("Failed to initialize data structures processor " +
"(thread has been interrupted).", e);
}
}
}
/**
* Gets or creates count down latch. If count down latch is not found in cache,
* it is created using provided name and count parameter.
*
* @param name Name of the latch.
* @param cfg Configuration.
* @param cnt Initial count.
* @param autoDel {@code True} to automatically delete latch from cache when
* its count reaches zero.
* @param create If {@code true} latch will be created in case it is not in cache,
* if it is {@code false} all parameters except {@code name} are ignored.
* @return Count down latch for the given name or {@code null} if it is not found and
* {@code create} is false.
* @throws IgniteCheckedException If operation failed.
*/
public IgniteCountDownLatch countDownLatch(final String name,
@Nullable AtomicConfiguration cfg,
final int cnt,
final boolean autoDel,
final boolean create
) throws IgniteCheckedException {
if (create)
A.ensure(cnt >= 0, "count can not be negative");
return getAtomic(new AtomicAccessor<GridCacheCountDownLatchEx>() {
@Override public T2<GridCacheCountDownLatchEx, AtomicDataStructureValue> get(
GridCacheInternalKey key,
AtomicDataStructureValue val,
IgniteInternalCache cache
) throws IgniteCheckedException {
// Check that count down hasn't been created in other thread yet.
GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
if (latch != null) {
assert val != null;
return new T2<>(latch, null);
}
if (val == null && !create)
return null;
GridCacheCountDownLatchValue retVal = (val == null ? new GridCacheCountDownLatchValue(cnt, autoDel,
ctx.discovery().gridStartTime()) : null);
GridCacheCountDownLatchValue latchVal = retVal != null ? retVal : (GridCacheCountDownLatchValue)val;
assert latchVal != null;
latch = new GridCacheCountDownLatchImpl(name, latchVal.initialCount(),
latchVal.autoDelete(),
key,
cache);
return new T2<GridCacheCountDownLatchEx, AtomicDataStructureValue>(latch, retVal);
}
}, cfg, name, COUNT_DOWN_LATCH, create, GridCacheCountDownLatchEx.class);
}
/**
* Removes count down latch from cache.
*
* @param name Name of the latch.
* @param grpName Cache group name.
* @throws IgniteCheckedException If operation failed.
*/
public void removeCountDownLatch(final String name, final String grpName) throws IgniteCheckedException {
removeDataStructure(new IgnitePredicateX<AtomicDataStructureValue>() {
@Override public boolean applyx(AtomicDataStructureValue val) throws IgniteCheckedException {
assert val != null && val instanceof GridCacheCountDownLatchValue;
GridCacheCountDownLatchValue latchVal = (GridCacheCountDownLatchValue)val;
if (latchVal.get() > 0) {
throw new IgniteCheckedException("Failed to remove count down latch " +
"with non-zero count: " + latchVal.get());
}
return true;
}
}, name, grpName, COUNT_DOWN_LATCH, null);
}
/**
* Gets or creates semaphore. If semaphore is not found in cache,
* it is created using provided name and count parameter.
*
* @param name Name of the semaphore.
* @param cfg Configuration.
* @param cnt Initial count.
* @param failoverSafe {@code True} FailoverSafe parameter.
* @param create If {@code true} semaphore will be created in case it is not in cache,
* if it is {@code false} all parameters except {@code name} are ignored.
* @return Semaphore for the given name or {@code null} if it is not found and
* {@code create} is false.
* @throws IgniteCheckedException If operation failed.
*/
public IgniteSemaphore semaphore(final String name, @Nullable AtomicConfiguration cfg, final int cnt,
final boolean failoverSafe, final boolean create)
throws IgniteCheckedException {
return getAtomic(new AtomicAccessor<GridCacheSemaphoreEx>() {
@Override public T2<GridCacheSemaphoreEx, AtomicDataStructureValue> get(
GridCacheInternalKey key,
AtomicDataStructureValue val,
IgniteInternalCache cache
) throws IgniteCheckedException {
// Check that semaphore hasn't been created in other thread yet.
GridCacheSemaphoreEx sem = cast(dsMap.get(key), GridCacheSemaphoreEx.class);
if (sem != null) {
assert val != null;
return new T2<>(sem, null);
}
if (val == null && !create)
return null;
AtomicDataStructureValue retVal = (val == null ? new GridCacheSemaphoreState(cnt,
new HashMap<UUID, Integer>(), failoverSafe, ctx.discovery().gridStartTime()) : null);
GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl(name, key, cache);
//check Cluster state against semaphore state
if (val != null && failoverSafe) {
GridCacheSemaphoreState semState = (GridCacheSemaphoreState)val;
boolean updated = false;
Map<UUID, Integer> waiters = semState.getWaiters();
Integer permit = ((GridCacheSemaphoreState)val).getCount();
for (UUID nodeId : new HashSet<>(waiters.keySet())) {
ClusterNode node = ctx.cluster().get().node(nodeId);
if (node == null) {
permit += waiters.get(nodeId);
waiters.remove(nodeId);
updated = true;
}
}
if (updated) {
semState.setWaiters(waiters);
semState.setCount(permit);
retVal = semState;
}
}
return new T2<>(sem0, retVal);
}
}, cfg, name, SEMAPHORE, create, GridCacheSemaphoreEx.class);
}
/**
* Removes semaphore from cache.
*
* @param name Name of the semaphore.
* @param grpName Group name.
* @throws IgniteCheckedException If operation failed.
*/
public void removeSemaphore(final String name, final String grpName) throws IgniteCheckedException {
removeDataStructure(new IgnitePredicateX<AtomicDataStructureValue>() {
@Override public boolean applyx(AtomicDataStructureValue val) throws IgniteCheckedException {
assert val != null && val instanceof GridCacheSemaphoreState;
GridCacheSemaphoreState semVal = (GridCacheSemaphoreState)val;
if (semVal.getCount() < 0)
throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. ");
return true;
}
}, name, grpName, SEMAPHORE, null);
}
/**
* Gets or creates reentrant lock. If reentrant lock is not found in cache,
* it is created using provided name, failover mode, and fairness mode parameters.
*
* @param name Name of the reentrant lock.
* @param cfg Configuration.
* @param failoverSafe Flag indicating behaviour in case of failure.
* @param fair Flag indicating fairness policy of this lock.
* @param create If {@code true} reentrant lock will be created in case it is not in cache.
* @return ReentrantLock for the given name or {@code null} if it is not found and
* {@code create} is false.
* @throws IgniteCheckedException If operation failed.
*/
public IgniteLock reentrantLock(final String name, @Nullable AtomicConfiguration cfg, final boolean failoverSafe,
final boolean fair, final boolean create) throws IgniteCheckedException {
return getAtomic(new AtomicAccessor<GridCacheLockEx>() {
@Override public T2<GridCacheLockEx, AtomicDataStructureValue> get(
GridCacheInternalKey key,
AtomicDataStructureValue val,
IgniteInternalCache cache
) throws IgniteCheckedException {
// Check that reentrant lock hasn't been created in other thread yet.
GridCacheLockEx reentrantLock = cast(dsMap.get(key), GridCacheLockEx.class);
if (reentrantLock != null) {
assert val != null;
return new T2<>(reentrantLock, null);
}
if (val == null && !create)
return new T2<>(null, null);
AtomicDataStructureValue retVal = (val == null ? new GridCacheLockState(0, ctx.localNodeId(),
0, failoverSafe, fair, ctx.discovery().gridStartTime()) : null);
GridCacheLockEx reentrantLock0 = new GridCacheLockImpl(name, key, cache);
return new T2<>(reentrantLock0, retVal);
}
}, cfg, name, REENTRANT_LOCK, create, GridCacheLockEx.class);
}
/**
* Removes reentrant lock from cache.
*
* @param name Name of the reentrant lock.
* @param grpName Group name.
* @param broken Flag indicating the reentrant lock is broken and should be removed unconditionally.
* @throws IgniteCheckedException If operation failed.
*/
public void removeReentrantLock(final String name, final String grpName, final boolean broken) throws IgniteCheckedException {
removeDataStructure(new IgnitePredicateX<AtomicDataStructureValue>() {
@Override public boolean applyx(AtomicDataStructureValue val) throws IgniteCheckedException {
assert val != null && val instanceof GridCacheLockState;
GridCacheLockState lockVal = (GridCacheLockState)val;
if (lockVal.get() > 0 && !broken)
throw new IgniteCheckedException("Failed to remove reentrant lock with blocked threads. ");
return true;
}
}, name, grpName, REENTRANT_LOCK, null);
}
/**
*
*/
static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
return evt.getValue() instanceof GridCacheCountDownLatchValue ||
evt.getValue() instanceof GridCacheSemaphoreState ||
evt.getValue() instanceof GridCacheLockState;
else {
assert evt.getEventType() == EventType.REMOVED : evt;
return true;
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStructuresEntryFilter.class, this);
}
}
/**
*
*/
private class DataStructuresEntryListener implements
CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
/** {@inheritDoc} */
@Override public void onUpdated(
Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts
) throws CacheEntryListenerException {
for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
GridCacheInternal val0 = evt.getValue();
if (val0 instanceof GridCacheCountDownLatchValue) {
final GridCacheInternalKey key = evt.getKey();
// Notify latch on changes.
final GridCacheRemovable latch = dsMap.get(key);
GridCacheCountDownLatchValue val = (GridCacheCountDownLatchValue)val0;
if (latch instanceof GridCacheCountDownLatchEx) {
final GridCacheCountDownLatchEx latch0 = (GridCacheCountDownLatchEx)latch;
latch0.onUpdate(val.get());
if (val.get() == 0 && val.autoDelete()) {
dsMap.remove(key);
IgniteInternalFuture<?> rmvFut = ctx.closure().runLocalSafe(new GPR() {
@Override public void run() {
try {
removeCountDownLatch(latch0.name(), key.groupName());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
}
finally {
ctx.cache().context().txContextReset();
}
}
});
rmvFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
}
latch.onRemoved();
}
});
}
}
else if (latch != null) {
U.error(log, "Failed to cast object " +
"[expected=" + IgniteCountDownLatch.class.getSimpleName() +
", actual=" + latch.getClass() + ", value=" + latch + ']');
}
}
else if (val0 instanceof GridCacheSemaphoreState) {
GridCacheInternalKey key = evt.getKey();
// Notify semaphore on changes.
final GridCacheRemovable sem = dsMap.get(key);
GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;
if (sem instanceof GridCacheSemaphoreEx) {
final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)sem;
semaphore0.onUpdate(val);
}
else if (sem != null) {
U.error(log, "Failed to cast object " +
"[expected=" + IgniteSemaphore.class.getSimpleName() +
", actual=" + sem.getClass() + ", value=" + sem + ']');
}
}
else if (val0 instanceof GridCacheLockState) {
GridCacheInternalKey key = evt.getKey();
// Notify reentrant lock on changes.
final GridCacheRemovable reentrantLock = dsMap.get(key);
GridCacheLockState val = (GridCacheLockState)val0;
if (reentrantLock instanceof GridCacheLockEx) {
final GridCacheLockEx lock0 = (GridCacheLockEx)reentrantLock;
lock0.onUpdate(val);
}
else if (reentrantLock != null) {
U.error(log, "Failed to cast object " +
"[expected=" + IgniteLock.class.getSimpleName() +
", actual=" + reentrantLock.getClass() + ", value=" + reentrantLock + ']');
}
}
}
else {
assert evt.getEventType() == EventType.REMOVED : evt;
GridCacheInternal key = evt.getKey();
// Entry's val is null if entry deleted.
GridCacheRemovable obj = dsMap.remove(key);
if (obj != null)
obj.onRemoved();
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStructuresEntryListener.class, this);
}
}
/**
* Gets a set from cache or creates one if it's not cached.
*
* @param name Set name.
* @param grpName Group name. If present, will override groupName from configuration.
* @param cfg Set configuration if new set should be created.
* @return Set instance.
* @throws IgniteCheckedException If failed.
*/
@Nullable public <T> IgniteSet<T> set(final String name, @Nullable final String grpName, @Nullable final CollectionConfiguration cfg)
throws IgniteCheckedException {
A.notNull(name, "name");
final boolean create = cfg != null;
final boolean collocated = isCollocated(cfg);
final boolean separated = !collocated &&
U.isOldestNodeVersionAtLeast(SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE, ctx.grid().cluster().nodes());
return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() {
@Override public IgniteSet<T> applyx(GridCacheContext cctx) throws IgniteCheckedException {
return cctx.dataStructures().set(name, collocated, create, separated);
}
}, cfg, name, grpName, SET, create, separated);
}
/**
* Gets a set from cache by known cache id. Does not create new sets.
*
* @param name Set name.
* @param cacheId Cache id.
* @param collocated Colocated mode flag.
* @param separated Separated cache flag.
* @return Set instance.
* @throws IgniteCheckedException If failed.
*/
@Nullable public <T> IgniteSet<T> set(String name, int cacheId, boolean collocated, boolean separated)
throws IgniteCheckedException {
A.notNull(name, "name");
DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
if (desc == null)
return null;
IgniteInternalCache<Object, Object> cache = ctx.cache().cache(desc.cacheName());
if (cache == null)
return null;
return cache.context().dataStructures().set(name, collocated, false, separated);
}
/**
* @param name Set name.
* @param cctx Set cache context.
* @throws IgniteCheckedException If failed.
*/
public void removeSet(final String name, final GridCacheContext cctx) throws IgniteCheckedException {
assert name != null;
assert cctx != null;
CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() {
@Override public void applyx(GridCacheSetHeader hdr) throws IgniteCheckedException {
hdr = (GridCacheSetHeader)cctx.cache().withNoRetries().getAndRemove(new GridCacheSetHeaderKey(name));
if (hdr != null)
cctx.dataStructures().removeSetData(hdr.id(), hdr.separated());
}
};
removeDataStructure(null, name, cctx.group().name(), SET, afterRmv);
}
/**
* @param log Logger.
* @param call Callable.
* @return Callable result.
* @throws IgniteCheckedException If all retries failed.
*/
public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
try {
return GridCacheUtils.retryTopologySafe(call);
}
catch (IgniteCheckedException e) {
throw e;
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
/**
* Tries to cast the object to expected type.
*
* @param obj Object which will be casted.
* @param cls Class
* @param <R> Type of expected result.
* @return Object has casted to expected type.
* @throws IgniteCheckedException If {@code obj} has different to {@code cls} type.
*/
@Nullable private <R> R cast(@Nullable Object obj, Class<R> cls) throws IgniteCheckedException {
if (obj == null)
return null;
if (cls.isInstance(obj))
return (R)obj;
else
return null;
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> ");
X.println(">>> Data structure processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
X.println(">>> dsMapSize: " + dsMap.size());
}
/**
* @throws IgniteException If atomics configuration is not provided.
*/
private void checkAtomicsConfiguration() throws IgniteException {
if (dfltAtomicCfg == null)
throw new IgniteException("Atomic data structure can not be created, " +
"need to provide AtomicConfiguration.");
}
/**
* @param c Closure to run.
* @throws IgniteCheckedException If failed.
* @return Closure return value.
*/
private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException {
for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
try {
return c.applyx();
}
catch (NodeStoppingException e) {
throw e;
}
catch (IgniteCheckedException e) {
if (i == GridCacheAdapter.MAX_RETRIES - 1)
throw e;
ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
if (topErr == null || (topErr instanceof ClusterTopologyServerNotFoundException))
throw e;
IgniteInternalFuture<?> fut = topErr.retryReadyFuture();
if (fut != null)
fut.get();
}
}
assert false;
return null;
}
/**
*
*/
private interface AtomicAccessor<T> {
/**
* @param key Key.
* @param val Existing value.
* @param cache Data structure cache.
* @return Data structure instance and value to store in cache.
* @throws IgniteCheckedException If failed.
*/
T2<T, AtomicDataStructureValue> get(GridCacheInternalKey key,
@Nullable AtomicDataStructureValue val,
IgniteInternalCache cache) throws IgniteCheckedException;
}
}