blob: 4e20267140b0756d8852f74b02b8605a88c9e7aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastructures;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Cache sequence implementation.
*/
public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<GridCacheAtomicSequenceValue>
implements GridCacheAtomicSequenceEx, IgniteChangeGlobalStateSupport, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** De-serialization stash. */
private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
@Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
return new IgniteBiTuple<>();
}
};
/** Local value of sequence. */
@GridToStringInclude(sensitive = true)
private volatile long locVal;
/** Upper bound of local counter. */
private long upBound;
/** Sequence batch size */
private volatile int batchSize;
/** Synchronization lock for local value updates. */
private final Lock localUpdate = new ReentrantLock();
/** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */
private final ReentrantLock distUpdateFreeTop = new ReentrantLock();
/** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */
private final ReentrantLock distUpdateLockedTop = new ReentrantLock();
/** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */
private final Callable<Long> incAndGetCall = internalUpdate(1, true);
/** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */
private final Callable<Long> getAndIncCall = internalUpdate(1, false);
/**
* Empty constructor required by {@link Externalizable}.
*/
public GridCacheAtomicSequenceImpl() {
// No-op.
}
/**
* Default constructor.
*
* @param name Sequence name.
* @param key Sequence key.
* @param seqView Sequence projection.
* @param batchSize Sequence batch size.
* @param locVal Local counter.
* @param upBound Upper bound.
*/
public GridCacheAtomicSequenceImpl(String name,
GridCacheInternalKey key,
IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView,
int batchSize,
long locVal,
long upBound)
{
super(name, key, seqView);
assert locVal <= upBound;
this.batchSize = batchSize;
this.upBound = upBound;
this.locVal = locVal;
}
/** {@inheritDoc} */
@Override public long get() {
checkRemoved();
return locVal;
}
/** {@inheritDoc} */
@Override public long incrementAndGet() {
try {
return internalUpdate(1, incAndGetCall, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public long getAndIncrement() {
try {
return internalUpdate(1, getAndIncCall, false);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public long addAndGet(long l) {
A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
try {
return internalUpdate(l, null, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public long getAndAdd(long l) {
A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
try {
return internalUpdate(l, null, false);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/**
* Synchronous sequence update operation. Will add given amount to the sequence value.
*
* @param l Increment amount.
* @param updateCall Cache call that will update sequence reservation count in accordance with l.
* @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
* prior to update.
* @return Sequence value.
* @throws IgniteCheckedException If update failed.
*/
@SuppressWarnings("SignalWithoutCorrespondingAwait")
private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
checkRemoved();
assert l > 0;
localUpdate.lock();
try {
// If reserved range isn't exhausted.
long locVal0 = locVal;
if (locVal0 + l <= upBound) {
locVal = locVal0 + l;
return updated ? locVal0 + l : locVal0;
}
}
finally {
localUpdate.unlock();
}
AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null);
// We need two separate locks here because two independent thread may attempt to update the sequence
// simultaneously, one thread with locked topology and other with unlocked.
// We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread
// waits for topology change, and locked topology thread waits to acquire the lock.
// If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time
// we do not want multiple threads to attempt to run identical cache updates.
ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop;
distLock.lock();
try {
if (updateCall == null)
updateCall = internalUpdate(l, updated);
try {
return CU.retryTopologySafe(updateCall);
}
catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
throw e;
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
finally {
distLock.unlock();
}
}
/** Get local batch size for this sequences.
*
* @return Sequence batch size.
*/
@Override public int batchSize() {
return batchSize;
}
/**
* Set local batch size for this sequences.
*
* @param size Sequence batch size. Must be more then 0.
*/
@Override public void batchSize(int size) {
A.ensure(size > 0, " Batch size can't be less then 0: " + size);
localUpdate.lock();
try {
batchSize = size;
}
finally {
localUpdate.unlock();
}
}
/** {@inheritDoc} */
@Override protected void invalidateLocalState() {
locVal = 0;
upBound = -1;
}
/** {@inheritDoc} */
@Override public void close() {
try {
if (rmvd)
return;
ctx.kernalContext().dataStructures().removeSequence(name, ctx.group().name());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/**
* Method returns callable for execution all update operations in async and sync mode.
*
* @param l Value will be added to sequence.
* @param updated If {@code true}, will return updated value, if {@code false}, will return previous value.
* @return Callable for execution in async and sync mode.
*/
@SuppressWarnings("TooBroadScope")
private Callable<Long> internalUpdate(final long l, final boolean updated) {
return new Callable<Long>() {
@Override public Long call() throws Exception {
assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread();
try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = cacheView.get(key);
checkRemoved();
assert seq != null;
long curLocVal;
long newUpBound;
// Even though we hold a transaction lock here, we must hold the local update lock here as well
// because we mutate multipe variables (locVal and upBound).
localUpdate.lock();
try {
curLocVal = locVal;
// If local range was already reserved in another thread.
if (curLocVal + l <= upBound) {
locVal = curLocVal + l;
return updated ? curLocVal + l : curLocVal;
}
long curGlobalVal = seq.get();
long newLocVal;
/* We should use offset because we already reserved left side of range.*/
long off = batchSize > 1 ? batchSize - 1 : 1;
// Calculate new values for local counter, global counter and upper bound.
if (curLocVal + l >= curGlobalVal) {
newLocVal = curLocVal + l;
newUpBound = newLocVal + off;
}
else {
newLocVal = curGlobalVal;
newUpBound = newLocVal + off;
}
locVal = newLocVal;
upBound = newUpBound;
if (updated)
curLocVal = newLocVal;
}
finally {
localUpdate.unlock();
}
// Global counter must be more than reserved upper bound.
seq.set(newUpBound + 1);
cacheView.put(key, seq);
tx.commit();
return curLocVal;
}
catch (Error | Exception e) {
if(!X.hasCause(e, ClusterTopologyCheckedException.class))
U.error(log, "Failed to get and add: " + this, e);
throw e;
}
}
};
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx.kernalContext());
out.writeUTF(name);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
IgniteBiTuple<GridKernalContext, String> t = stash.get();
t.set1((GridKernalContext)in.readObject());
t.set2(in.readUTF());
}
/**
* Reconstructs object on unmarshalling.
*
* @return Reconstructed object.
* @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
private Object readResolve() throws ObjectStreamException {
try {
IgniteBiTuple<GridKernalContext, String> t = stash.get();
return t.get1().dataStructures().sequence(t.get2(), null, 0L, false);
}
catch (IgniteCheckedException e) {
throw U.withCause(new InvalidObjectException(e.getMessage()), e);
}
finally {
stash.remove();
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheAtomicSequenceImpl.class, this);
}
}