blob: 48a0da4f4f7aeef14154b92097e591f1def97aab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
* @param <V> Type for cache query future.
*/
public class CacheWeakQueryIteratorsHolder<V> {
/** Iterators weak references queue. */
private final ReferenceQueue refQueue = new ReferenceQueue();
/** Iterators futures. */
private final Map<WeakReference, AutoCloseable> refs = new ConcurrentHashMap<>();
/** Logger. */
private final IgniteLogger log;
/**
* @param log Logger.
*/
public CacheWeakQueryIteratorsHolder(IgniteLogger log) {
this.log = log;
}
/**
* @param fut Query to iterate.
* @param convert Cache iterator converter.
* @param <T> Type for the iterator.
* @return Iterator over the cache.
*/
public <T> WeakReferenceCloseableIterator<T> iterator(final CacheQueryFuture<V> fut,
CacheIteratorConverter<T, V> convert) {
WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert);
AutoCloseable old = refs.put(it.weakReference(), fut);
assert old == null;
return it;
}
/**
* @param iter Closeable iterator.
* @param <T> Type for the iterator.
* @return Iterator over the cache.
*/
public <T> WeakReferenceCloseableIterator<T> iterator(final GridCloseableIterator<V> iter,
CacheIteratorConverter<T, V> convert) {
WeakQueryCloseableIterator it = new WeakQueryCloseableIterator(iter, convert);
AutoCloseable old = refs.put(it.weakReference(), iter);
assert old == null;
return it;
}
/**
* @param it Iterator.
*
* @throws IgniteCheckedException If failed.
*/
public void removeIterator(WeakReferenceCloseableIterator it) throws IgniteCheckedException {
refs.remove(it.weakReference());
it.close();
}
/**
* Closes unreachable iterators.
*/
public void checkWeakQueue() {
for (Reference itRef = refQueue.poll(); itRef != null;
itRef = refQueue.poll()) {
try {
WeakReference weakRef = (WeakReference)itRef;
AutoCloseable rsrc = refs.remove(weakRef);
if (rsrc != null)
rsrc.close();
}
catch (Exception e) {
U.error(log, "Failed to close iterator.", e);
}
}
}
/**
* Cancel all cache queries.
*/
public void clearQueries() {
for (AutoCloseable rsrc : refs.values()) {
try {
rsrc.close();
}
catch (Exception e) {
U.error(log, "Failed to close iterator.", e);
}
}
refs.clear();
}
/**
* Iterator based of {@link CacheQueryFuture}.
*
* @param <T> Type for iterator.
*/
private class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T>
implements WeakReferenceCloseableIterator<T> {
/** */
private static final long serialVersionUID = 0L;
/** Query future. */
private final CacheQueryFuture<V> fut;
/** Weak reference. */
private final WeakReference<WeakQueryFutureIterator<T>> weakRef;
/** */
private final CacheIteratorConverter<T, V> convert;
/** Init flag. */
private boolean init;
/** Next item. */
private T next;
/** Current item. */
private T cur;
/**
* @param fut GridCacheQueryFuture to iterate.
* @param convert Converter.
*/
WeakQueryFutureIterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) {
this.fut = fut;
this.weakRef = new WeakReference<>(this, refQueue);
this.convert = convert;
}
/** {@inheritDoc} */
@Override public T onNext() throws IgniteCheckedException {
init();
if (next == null) {
clearWeakReference();
throw new NoSuchElementException();
}
cur = next;
V futNext = fut.next();
if (futNext == null)
clearWeakReference();
next = futNext != null ? convert.convert(futNext) : null;
return cur;
}
/** {@inheritDoc} */
@Override public boolean onHasNext() throws IgniteCheckedException {
init();
boolean hasNext = next != null;
if (!hasNext)
clearWeakReference();
return hasNext;
}
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
fut.cancel();
clearWeakReference();
}
/** {@inheritDoc} */
@Override protected void onRemove() throws IgniteCheckedException {
if (cur == null)
throw new IllegalStateException();
convert.remove(cur);
cur = null;
}
/** {@inheritDoc} */
@Override public WeakReference<WeakQueryFutureIterator<T>> weakReference() {
return weakRef;
}
/**
* Clears weak reference.
*/
private void clearWeakReference() {
weakRef.clear();
refs.remove(weakRef);
}
/**
* @throws IgniteCheckedException If failed.
*/
private void init() throws IgniteCheckedException {
if (!init) {
V futNext = fut.next();
next = futNext != null ? convert.convert(futNext) : null;
init = true;
}
}
}
/**
* @param <T> Type.
*/
public class WeakQueryCloseableIterator<T> extends GridCloseableIteratorAdapter<T>
implements WeakReferenceCloseableIterator<T> {
/** */
private static final long serialVersionUID = 0;
/** */
private final GridCloseableIterator<V> iter;
/** */
private final CacheIteratorConverter<T, V> convert;
/** */
private final WeakReference weakRef;
/** */
private T cur;
/**
* @param iter Iterator.
* @param convert Converter.
*/
WeakQueryCloseableIterator(GridCloseableIterator<V> iter, CacheIteratorConverter<T, V> convert) {
this.iter = iter;
this.convert = convert;
weakRef = new WeakReference(this, refQueue);
}
/** {@inheritDoc} */
@Override protected T onNext() throws IgniteCheckedException {
V next;
try {
next = iter.nextX();
}
catch (NoSuchElementException e) {
clearWeakReference();
throw e;
}
if (next == null)
clearWeakReference();
cur = next != null ? convert.convert(next) : null;
return cur;
}
/** {@inheritDoc} */
@Override protected boolean onHasNext() throws IgniteCheckedException {
boolean hasNextX = iter.hasNextX();
if (!hasNextX)
clearWeakReference();
return hasNextX;
}
/** {@inheritDoc} */
@Override protected void onRemove() throws IgniteCheckedException {
if (cur == null)
throw new IllegalStateException();
convert.remove(cur);
cur = null;
}
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
iter.close();
clearWeakReference();
}
/**
* Clears weak reference.
*/
private void clearWeakReference() {
weakRef.clear();
refs.remove(weakRef);
}
/** {@inheritDoc} */
@Override public WeakReference weakReference() {
return weakRef;
}
}
/**
*
*/
public static interface WeakReferenceCloseableIterator<T> extends GridCloseableIterator<T> {
/**
* @return Iterator weak reference.
*/
public WeakReference weakReference();
}
}