| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.datastructures; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.util.AbstractCollection; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteCompute; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteInterruptedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteQueue; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteCallable; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| |
| /** |
| * Common code for {@link IgniteQueue} implementation. |
| */ |
| public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> implements IgniteQueue<T> { |
| /** Value returned by closure updating queue header indicating that queue was removed. */ |
| protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE; |
| |
| /** */ |
| private static final int DFLT_CLEAR_BATCH_SIZE = 100; |
| |
| /** Logger. */ |
| protected final IgniteLogger log; |
| |
| /** Cache context. */ |
| protected final GridCacheContext<?, ?> cctx; |
| |
| /** Cache. */ |
| protected final GridCacheAdapter cache; |
| |
| /** Queue name. */ |
| protected final String queueName; |
| |
| /** Queue header key. */ |
| protected final GridCacheQueueHeaderKey queueKey; |
| |
| /** Queue unique ID. */ |
| protected final IgniteUuid id; |
| |
| /** Queue capacity. */ |
| private final int cap; |
| |
| /** Collocation flag. */ |
| private final boolean collocated; |
| |
| /** Removed flag. */ |
| private volatile boolean rmvd; |
| |
| /** Read blocking operations semaphore. */ |
| @GridToStringExclude |
| private final Semaphore readSem; |
| |
| /** Write blocking operations semaphore. */ |
| @GridToStringExclude |
| private final Semaphore writeSem; |
| |
| /** Access to affinityRun() and affinityCall() functions. */ |
| private final IgniteCompute compute; |
| |
| /** |
| * @param queueName Queue name. |
| * @param hdr Queue hdr. |
| * @param cctx Cache context. |
| */ |
| @SuppressWarnings("unchecked") |
| protected GridCacheQueueAdapter(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) { |
| this.cctx = cctx; |
| this.queueName = queueName; |
| id = hdr.id(); |
| cap = hdr.capacity(); |
| collocated = hdr.collocated(); |
| queueKey = new GridCacheQueueHeaderKey(queueName); |
| cache = cctx.kernalContext().cache().internalCache(cctx.name()); |
| this.compute = cctx.kernalContext().grid().compute(); |
| |
| log = cctx.logger(getClass()); |
| |
| readSem = new Semaphore(hdr.size(), true); |
| |
| writeSem = bounded() ? new Semaphore(hdr.capacity() - hdr.size(), true) : null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String name() { |
| return queueName; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean add(T item) { |
| A.notNull(item, "item"); |
| |
| return offer(item); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean collocated() { |
| return collocated; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int capacity() { |
| return cap; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean bounded() { |
| return cap < Integer.MAX_VALUE; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public int size() { |
| try { |
| GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); |
| |
| checkRemoved(hdr); |
| |
| return hdr.size(); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Nullable @Override public T peek() throws IgniteException { |
| try { |
| while (true) { |
| GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); |
| |
| checkRemoved(hdr); |
| |
| if (hdr.empty()) |
| return null; |
| |
| T val = (T)cache.get(itemKey(hdr.head())); |
| |
| if (val == null) |
| // Header might have been polled. Retry. |
| continue; |
| |
| return val; |
| } |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public T remove() { |
| T res = poll(); |
| |
| if (res == null) |
| throw new NoSuchElementException(); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public T element() { |
| T el = peek(); |
| |
| if (el == null) |
| throw new NoSuchElementException(); |
| |
| return el; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public Iterator<T> iterator() { |
| try { |
| GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); |
| |
| checkRemoved(hdr); |
| |
| return new QueueIterator(hdr); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void put(T item) throws IgniteException { |
| A.notNull(item, "item"); |
| |
| if (!bounded()) { |
| boolean offer = offer(item); |
| |
| assert offer; |
| |
| return; |
| } |
| |
| while (true) { |
| try { |
| writeSem.acquire(); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedException("Queue put interrupted.", e); |
| } |
| |
| checkStopping(); |
| |
| if (offer(item)) |
| return; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean offer(T item, long timeout, TimeUnit unit) throws IgniteException { |
| A.notNull(item, "item"); |
| A.ensure(timeout >= 0, "Timeout cannot be negative: " + timeout); |
| |
| if (!bounded()) { |
| boolean offer = offer(item); |
| |
| assert offer; |
| |
| return true; |
| } |
| |
| long end = U.currentTimeMillis() + MILLISECONDS.convert(timeout, unit); |
| |
| while (U.currentTimeMillis() < end) { |
| boolean retVal = false; |
| |
| try { |
| if (writeSem.tryAcquire(end - U.currentTimeMillis(), MILLISECONDS)) { |
| checkStopping(); |
| |
| retVal = offer(item); |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedException("Queue put interrupted.", e); |
| } |
| |
| if (retVal) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public T take() throws IgniteException { |
| while (true) { |
| try { |
| readSem.acquire(); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedException("Queue take interrupted.", e); |
| } |
| |
| checkStopping(); |
| |
| T e = poll(); |
| |
| if (e != null) |
| return e; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public T poll(long timeout, TimeUnit unit) throws IgniteException { |
| A.ensure(timeout >= 0, "Timeout cannot be negative: " + timeout); |
| |
| long end = U.currentTimeMillis() + MILLISECONDS.convert(timeout, unit); |
| |
| while (U.currentTimeMillis() < end) { |
| T retVal = null; |
| |
| try { |
| if (readSem.tryAcquire(end - U.currentTimeMillis(), MILLISECONDS)) { |
| checkStopping(); |
| |
| retVal = poll(); |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedException("Queue poll interrupted.", e); |
| } |
| |
| if (retVal != null) |
| return retVal; |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int remainingCapacity() { |
| if (!bounded()) |
| return Integer.MAX_VALUE; |
| |
| int remaining = cap - size(); |
| |
| return remaining > 0 ? remaining : 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear() { |
| clear(DFLT_CLEAR_BATCH_SIZE); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void clear(int batchSize) throws IgniteException { |
| A.ensure(batchSize >= 0, "Batch size cannot be negative: " + batchSize); |
| |
| try { |
| IgniteBiTuple<Long, Long> t = |
| (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, new ClearProcessor(id)).get(); |
| |
| if (t == null) |
| return; |
| |
| checkRemoved(t.get1()); |
| |
| removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int drainTo(Collection<? super T> c) { |
| return drainTo(c, Integer.MAX_VALUE); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int drainTo(Collection<? super T> c, int maxElements) { |
| int max = Math.min(maxElements, size()); |
| |
| for (int i = 0; i < max; i++) { |
| T el = poll(); |
| |
| if (el == null) |
| return i; |
| |
| c.add(el); |
| } |
| |
| return max; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean removed() { |
| return rmvd; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void affinityRun(IgniteRunnable job) { |
| if (!collocated) |
| throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() + |
| ". This operation is supported only for collocated queues."); |
| |
| compute.affinityRun(cache.name(), queueKey, job); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <R> R affinityCall(IgniteCallable<R> job) { |
| if (!collocated) |
| throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() + |
| ". This operation is supported only for collocated queues."); |
| |
| return compute.affinityCall(cache.name(), queueKey, job); |
| } |
| |
| /** |
| * @param cache Queue cache. |
| * @param id Queue unique ID. |
| * @param name Queue name. |
| * @param collocated Collocation flag. |
| * @param startIdx Start item index. |
| * @param endIdx End item index. |
| * @param batchSize Batch size. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| static void removeKeys( |
| GridCacheAdapter cache, |
| IgniteUuid id, |
| String name, |
| boolean collocated, |
| long startIdx, |
| long endIdx, |
| int batchSize) |
| throws IgniteCheckedException { |
| Set<QueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10); |
| |
| for (long idx = startIdx; idx < endIdx; idx++) { |
| keys.add(itemKey(id, name, collocated, idx)); |
| |
| if (batchSize > 0 && keys.size() == batchSize) { |
| cache.removeAll(keys); |
| |
| keys.clear(); |
| } |
| } |
| |
| if (!keys.isEmpty()) |
| cache.removeAll(keys); |
| } |
| |
| /** |
| * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed. |
| * |
| * @param idx Result of closure execution. |
| */ |
| protected final void checkRemoved(Long idx) { |
| if (idx == QUEUE_REMOVED_IDX) |
| onRemoved(true); |
| } |
| |
| /** |
| * Checks queue state, throws {@link IllegalStateException} if queue was removed. |
| * |
| * @param hdr Queue hdr. |
| */ |
| protected final void checkRemoved(@Nullable GridCacheQueueHeader hdr) { |
| if (queueRemoved(hdr, id)) |
| onRemoved(true); |
| } |
| |
| /** Release all semaphores used in blocking operations in case of client disconnect. */ |
| public void onClientDisconnected() { |
| releaseSemaphores(); |
| } |
| |
| /** |
| * Marks queue as removed. |
| * |
| * @param throw0 If {@code true} then throws {@link IllegalStateException}. |
| */ |
| public void onRemoved(boolean throw0) { |
| rmvd = true; |
| |
| releaseSemaphores(); |
| |
| if (throw0) |
| throw new IllegalStateException("Queue has been removed from cache: " + this); |
| } |
| |
| /** |
| * Release all semaphores used in blocking operations (used in case queue was removed or grid is stopping). |
| */ |
| private void releaseSemaphores() { |
| if (bounded()) { |
| writeSem.drainPermits(); |
| writeSem.release(1_000_000); // Let all blocked threads to proceed (operation will fail with exception). |
| } |
| |
| readSem.drainPermits(); |
| readSem.release(1_000_000); // Let all blocked threads to proceed (operation will fail with exception). |
| } |
| |
| /** |
| * @param hdr Queue header. |
| */ |
| public void onHeaderChanged(GridCacheQueueHeader hdr) { |
| if (!hdr.empty()) { |
| readSem.drainPermits(); |
| readSem.release(hdr.size()); |
| } |
| |
| if (bounded()) { |
| writeSem.drainPermits(); |
| |
| if (!hdr.full()) |
| writeSem.release(hdr.capacity() - hdr.size()); |
| } |
| } |
| |
| /** |
| * Grid stop callback. |
| */ |
| public void onKernalStop() { |
| releaseSemaphores(); |
| } |
| |
| /** |
| * Throws {@link IgniteException} in case if grid is stopping. |
| */ |
| private void checkStopping() { |
| if (cctx.kernalContext().isStopping()) |
| throw new IgniteException("Ignite is stopping"); |
| } |
| |
| /** |
| * @return Queue unique ID. |
| */ |
| public IgniteUuid id() { |
| return id; |
| } |
| |
| /** |
| * Removes item with given index from queue. |
| * |
| * @param rmvIdx Index of item to be removed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException; |
| |
| /** |
| * @param idx Item index. |
| * @return Item key. |
| */ |
| protected QueueItemKey itemKey(Long idx) { |
| return itemKey(id, queueName, collocated(), idx); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void close() { |
| if (rmvd) |
| return; |
| |
| try { |
| cctx.kernalContext().dataStructures().removeQueue(queueName, cctx); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** |
| * @param id Queue unique ID. |
| * @param queueName Queue name. |
| * @param collocated Collocation flag. |
| * @param idx Item index. |
| * @return Item key. |
| */ |
| private static QueueItemKey itemKey(IgniteUuid id, |
| String queueName, |
| boolean collocated, |
| long idx) { |
| return collocated ? |
| new CollocatedQueueItemKey(id, queueName, idx) : new GridCacheQueueItemKey(id, queueName, idx); |
| } |
| |
| /** |
| * @param hdr Queue header. |
| * @param id Expected queue unique ID. |
| * @return {@code True} if queue was removed. |
| */ |
| private static boolean queueRemoved(@Nullable GridCacheQueueHeader hdr, IgniteUuid id) { |
| return hdr == null || !id.equals(hdr.id()); |
| } |
| |
| /** |
| * |
| */ |
| private class QueueIterator implements Iterator<T> { |
| /** */ |
| private T next; |
| |
| /** */ |
| private T cur; |
| |
| /** */ |
| private long curIdx; |
| |
| /** */ |
| private long idx; |
| |
| /** */ |
| private long endIdx; |
| |
| /** */ |
| private Set<Long> rmvIdxs; |
| |
| /** |
| * @param hdr Queue header. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private QueueIterator(GridCacheQueueHeader hdr) throws IgniteCheckedException { |
| idx = hdr.head(); |
| endIdx = hdr.tail(); |
| rmvIdxs = hdr.removedIndexes(); |
| |
| assert !F.contains(rmvIdxs, idx) : idx; |
| |
| if (idx < endIdx) |
| next = (T)cache.get(itemKey(idx)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public T next() { |
| if (next == null) |
| throw new NoSuchElementException(); |
| |
| try { |
| cur = next; |
| curIdx = idx; |
| |
| idx++; |
| |
| if (rmvIdxs != null) { |
| while (F.contains(rmvIdxs, idx) && idx < endIdx) |
| idx++; |
| } |
| |
| next = idx < endIdx ? (T)cache.get(itemKey(idx)) : null; |
| |
| return cur; |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| if (cur == null) |
| throw new IllegalStateException(); |
| |
| try { |
| removeItem(curIdx); |
| |
| cur = null; |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| } |
| |
| /** |
| */ |
| protected static class ClearProcessor implements |
| EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, IgniteBiTuple<Long, Long>>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private IgniteUuid id; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public ClearProcessor() { |
| // No-op. |
| } |
| |
| /** |
| * @param id Queue unique ID. |
| */ |
| public ClearProcessor(IgniteUuid id) { |
| this.id = id; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteBiTuple<Long, Long> process( |
| MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { |
| GridCacheQueueHeader hdr = e.getValue(); |
| |
| boolean rmvd = queueRemoved(hdr, id); |
| |
| if (rmvd) |
| return new IgniteBiTuple<>(QUEUE_REMOVED_IDX, QUEUE_REMOVED_IDX); |
| else if (hdr.empty()) |
| return null; |
| |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| hdr.tail(), |
| hdr.tail(), |
| null); |
| |
| e.setValue(newHdr); |
| |
| return new IgniteBiTuple<>(hdr.head(), hdr.tail()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeGridUuid(out, id); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| id = U.readGridUuid(in); |
| } |
| } |
| |
| /** |
| */ |
| protected static class PollProcessor implements |
| EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private IgniteUuid id; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public PollProcessor() { |
| // No-op. |
| } |
| |
| /** |
| * @param id Queue unique ID. |
| */ |
| public PollProcessor(IgniteUuid id) { |
| this.id = id; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Long process( |
| MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { |
| GridCacheQueueHeader hdr = e.getValue(); |
| |
| boolean rmvd = queueRemoved(hdr, id); |
| |
| if (rmvd || hdr.empty()) |
| return rmvd ? QUEUE_REMOVED_IDX : null; |
| |
| Set<Long> rmvdIdxs = hdr.removedIndexes(); |
| |
| if (rmvdIdxs == null) { |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| hdr.head() + 1, |
| hdr.tail(), |
| null); |
| |
| e.setValue(newHdr); |
| |
| return hdr.head(); |
| } |
| |
| long next = hdr.head(); |
| |
| rmvdIdxs = new HashSet<>(rmvdIdxs); |
| |
| do { |
| if (!rmvdIdxs.remove(next)) { |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| next + 1, |
| hdr.tail(), |
| rmvdIdxs.isEmpty() ? null : rmvdIdxs); |
| |
| e.setValue(newHdr); |
| |
| return next; |
| } |
| |
| next++; |
| } while (next != hdr.tail()); |
| |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| next, |
| hdr.tail(), |
| rmvdIdxs.isEmpty() ? null : rmvdIdxs); |
| |
| e.setValue(newHdr); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeGridUuid(out, id); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| id = U.readGridUuid(in); |
| } |
| } |
| |
| /** |
| */ |
| protected static class AddProcessor implements |
| EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private IgniteUuid id; |
| |
| /** */ |
| private int size; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public AddProcessor() { |
| // No-op. |
| } |
| |
| /** |
| * @param id Queue unique ID. |
| * @param size Number of elements to add. |
| */ |
| public AddProcessor(IgniteUuid id, int size) { |
| this.id = id; |
| this.size = size; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { |
| GridCacheQueueHeader hdr = e.getValue(); |
| |
| boolean rmvd = queueRemoved(hdr, id); |
| |
| if (rmvd || !spaceAvailable(hdr, size)) |
| return rmvd ? QUEUE_REMOVED_IDX : null; |
| |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| hdr.head(), |
| hdr.tail() + size, |
| hdr.removedIndexes()); |
| |
| e.setValue(newHdr); |
| |
| return hdr.tail(); |
| } |
| |
| /** |
| * @param hdr Queue header. |
| * @param size Number of elements to add. |
| * @return {@code True} if new elements can be added. |
| */ |
| private boolean spaceAvailable(GridCacheQueueHeader hdr, int size) { |
| return !hdr.bounded() || (hdr.size() + size) <= hdr.capacity(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeGridUuid(out, id); |
| out.writeInt(size); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| id = U.readGridUuid(in); |
| size = in.readInt(); |
| } |
| } |
| |
| /** |
| */ |
| protected static class RemoveProcessor implements |
| EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private IgniteUuid id; |
| |
| /** */ |
| private Long idx; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public RemoveProcessor() { |
| // No-op. |
| } |
| |
| /** |
| * @param id Queue UUID. |
| * @param idx Index of item to be removed. |
| */ |
| public RemoveProcessor(IgniteUuid id, Long idx) { |
| this.id = id; |
| this.idx = idx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Long process(MutableEntry<GridCacheQueueHeaderKey, GridCacheQueueHeader> e, Object... args) { |
| GridCacheQueueHeader hdr = e.getValue(); |
| |
| boolean rmvd = queueRemoved(hdr, id); |
| |
| if (rmvd || hdr.empty() || idx < hdr.head()) |
| return rmvd ? QUEUE_REMOVED_IDX : null; |
| |
| if (idx == hdr.head()) { |
| Set<Long> rmvIdxs = hdr.removedIndexes(); |
| |
| long head = hdr.head() + 1; |
| |
| if (!F.contains(rmvIdxs, head)) { |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| head, |
| hdr.tail(), |
| hdr.removedIndexes()); |
| |
| e.setValue(newHdr); |
| |
| return idx; |
| } |
| |
| rmvIdxs = new HashSet<>(rmvIdxs); |
| |
| while (rmvIdxs.remove(head)) |
| head++; |
| |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| head, |
| hdr.tail(), |
| rmvIdxs.isEmpty() ? null : rmvIdxs); |
| |
| e.setValue(newHdr); |
| |
| return null; |
| } |
| |
| Set<Long> rmvIdxs = hdr.removedIndexes(); |
| |
| if (rmvIdxs == null) { |
| rmvIdxs = new HashSet<>(); |
| |
| rmvIdxs.add(idx); |
| } |
| else { |
| if (!rmvIdxs.contains(idx)) { |
| rmvIdxs = new HashSet<>(rmvIdxs); |
| |
| rmvIdxs.add(idx); |
| } |
| else |
| idx = null; |
| } |
| |
| GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(), |
| hdr.capacity(), |
| hdr.collocated(), |
| hdr.head(), |
| hdr.tail(), |
| rmvIdxs); |
| |
| e.setValue(newHdr); |
| |
| return idx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeGridUuid(out, id); |
| out.writeLong(idx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| id = U.readGridUuid(in); |
| idx = in.readLong(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| GridCacheQueueAdapter that = (GridCacheQueueAdapter)o; |
| |
| return id.equals(that.id); |
| |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return id.hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridCacheQueueAdapter.class, this); |
| } |
| } |