blob: 6abcfa81e41013c290b81a4df6f01bf3bf598d8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.jetbrains.annotations.Nullable;
/**
* Cache store wrapper that ensures that there will be no more that one thread loading value from underlying store.
*/
public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
/** */
public static final int DFLT_LOAD_ALL_THRESHOLD = CacheConfiguration.DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
/** Delegate store. */
private CacheStore<K, V> delegate;
/** Pending cache store loads. */
private ConcurrentMap<K, LoadFuture> pendingLoads = new ConcurrentHashMap<>();
/** Load all threshold. */
private int loadAllThreshold = DFLT_LOAD_ALL_THRESHOLD;
/**
* @param delegate Delegate store.
*/
public CacheStoreBalancingWrapper(CacheStore<K, V> delegate) {
this.delegate = delegate;
}
/**
* @param delegate Delegate store.
* @param loadAllThreshold Load all threshold.
*/
public CacheStoreBalancingWrapper(CacheStore<K, V> delegate, int loadAllThreshold) {
this.delegate = delegate;
this.loadAllThreshold = loadAllThreshold;
}
/**
* @return Load all threshold.
*/
public int loadAllThreshold() {
return loadAllThreshold;
}
/** {@inheritDoc} */
@Nullable @Override public V load(K key) {
LoadFuture fut = pendingLoads.get(key);
try {
if (fut != null)
return fut.get(key);
fut = new LoadFuture();
LoadFuture old = pendingLoads.putIfAbsent(key, fut);
if (old != null)
return old.get(key);
}
catch (IgniteCheckedException e) {
throw new CacheLoaderException(e);
}
try {
V val = delegate.load(key);
fut.onComplete(key, val);
return val;
}
catch (Throwable e) {
fut.onError(key, e);
if (e instanceof Error)
throw e;
throw e;
}
}
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
delegate.loadCache(clo, args);
}
/** {@inheritDoc} */
@Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
assert false;
return delegate.loadAll(keys);
}
/**
* @param keys Keys to load.
* @param c Closure for loaded values.
*/
public void loadAll(Collection<? extends K> keys, final IgniteBiInClosure<K, V> c) {
assert keys.size() <= loadAllThreshold : loadAllThreshold;
Collection<K> needLoad = null;
Map<K, LoadFuture> pending = null;
LoadFuture span = null;
for (K key : keys) {
LoadFuture fut = pendingLoads.get(key);
if (fut != null) {
if (pending == null)
pending = new HashMap<>();
pending.put(key, fut);
}
else {
// Try to concurrently add pending future.
if (span == null)
span = new LoadFuture();
LoadFuture old = pendingLoads.putIfAbsent(key, span);
if (old != null) {
if (pending == null)
pending = new HashMap<>();
pending.put(key, old);
}
else {
if (needLoad == null)
needLoad = new ArrayList<>(keys.size());
needLoad.add(key);
}
}
}
if (needLoad != null) {
assert !needLoad.isEmpty();
assert span != null;
try {
Map<K, V> loaded = delegate.loadAll(needLoad);
if (loaded != null) {
for (Map.Entry<K, V> e : loaded.entrySet())
c.apply(e.getKey(), e.getValue());
}
span.onComplete(needLoad, loaded);
}
catch (Throwable e) {
span.onError(needLoad, e);
if (e instanceof Error)
throw e;
throw e;
}
}
if (pending != null) {
try {
for (Map.Entry<K, LoadFuture> e : pending.entrySet()) {
K key = e.getKey();
c.apply(key, e.getValue().get(key));
}
}
catch (IgniteCheckedException e) {
throw new CacheLoaderException(e);
}
}
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends K, ? extends V> entry) {
delegate.write(entry);
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
delegate.writeAll(entries);
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
delegate.delete(key);
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
delegate.deleteAll(keys);
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
delegate.sessionEnd(commit);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheStoreBalancingWrapper.class, this);
}
/**
*
*/
private class LoadFuture extends GridFutureAdapter<Map<K, V>> {
/** Collection of keys for pending cleanup. */
private volatile Collection<K> keys;
/**
*
*/
public LoadFuture() {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
assert keys != null;
for (K key : keys)
pendingLoads.remove(key, this);
return true;
}
return false;
}
/**
* @param key Key.
* @param val Loaded value.
*/
public void onComplete(K key, V val) {
onComplete(Collections.singletonList(key), F.asMap(key, val));
}
/**
* @param keys Keys.
* @param res Loaded values.
*/
public void onComplete(Collection<K> keys, Map<K, V> res) {
this.keys = keys;
onDone(res);
}
/**
* @param key Key.
* @param err Error.
*/
public void onError(K key, Throwable err) {
this.keys = Collections.singletonList(key);
onDone(err);
}
/**
* @param keys Keys.
* @param err Error.
*/
public void onError(Collection<K> keys, Throwable err) {
this.keys = keys;
onDone(err);
}
/**
* Gets value loaded for key k.
*
* @param key Key to load.
* @return Loaded value (possibly {@code null}).
* @throws IgniteCheckedException If load failed.
*/
public V get(K key) throws IgniteCheckedException {
return get().get(key);
}
}
}