blob: 64a052d8b3e647aa987fb111f2fa071234009a9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.store;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper;
import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridSetWrapper;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Store manager.
*/
@SuppressWarnings({"AssignmentToCatchBlockParameter", "unchecked"})
public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapter implements CacheStoreManager {
/** */
private static final int SES_ATTR = GridMetadataAwareAdapter.EntryKey.CACHE_STORE_MANAGER_KEY.key();
/** */
protected CacheStore<Object, Object> store;
/** */
protected CacheStore<?, ?> cfgStore;
/** */
private CacheStoreBalancingWrapper<Object, Object> singleThreadGate;
/** */
private ThreadLocal<SessionData> sesHolder;
/** */
private ThreadLocalSession locSes;
/** */
private boolean locStore;
/** */
private boolean writeThrough;
/** */
private boolean readThrough;
/** */
private Collection<CacheStoreSessionListener> sesLsnrs;
/** */
private boolean globalSesLsnrs;
/** Always keep binary. */
protected boolean alwaysKeepBinary;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
GridKernalContext ctx = igniteContext();
CacheConfiguration cfg = cacheConfiguration();
writeThrough = cfg.isWriteThrough();
readThrough = cfg.isReadThrough();
this.cfgStore = cfgStore;
store = cacheStoreWrapper(ctx, cfgStore, cfg);
singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store,
cfg.getStoreConcurrentLoadAllThreshold());
ThreadLocal<SessionData> sesHolder0 = null;
if (cfgStore != null) {
sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore);
if (sesHolder0 == null) {
sesHolder0 = new ThreadLocal<>();
locSes = new ThreadLocalSession(sesHolder0);
if (ctx.resource().injectStoreSession(cfgStore, locSes))
sesHolders.put(cfgStore, sesHolder0);
}
else
locSes = new ThreadLocalSession(sesHolder0);
}
sesHolder = sesHolder0;
locStore = U.hasAnnotation(cfgStore, CacheLocalStore.class);
if (cfgStore instanceof CacheJdbcPojoStore)
alwaysKeepBinary = true;
}
/** {@inheritDoc} */
@Override public boolean isWriteThrough() {
return writeThrough;
}
/**
* Creates a wrapped cache store if write-behind cache is configured.
*
* @param ctx Kernal context.
* @param cfgStore Store provided in configuration.
* @param cfg Cache configuration.
* @return Instance if {@link GridCacheWriteBehindStore} if write-behind store is configured,
* or user-defined cache store.
*/
@SuppressWarnings({"unchecked"})
private CacheStore cacheStoreWrapper(GridKernalContext ctx,
@Nullable CacheStore cfgStore,
CacheConfiguration cfg) {
if (cfgStore == null || !cfg.isWriteBehindEnabled())
return cfgStore;
GridCacheWriteBehindStore store = new GridCacheWriteBehindStore(this,
ctx.igniteInstanceName(),
cfg.getName(),
ctx.log(GridCacheWriteBehindStore.class),
cfgStore);
store.setFlushSize(cfg.getWriteBehindFlushSize());
store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount());
store.setFlushFrequency(cfg.getWriteBehindFlushFrequency());
store.setBatchSize(cfg.getWriteBehindBatchSize());
store.setWriteCoalescing(cfg.getWriteBehindCoalescing());
return store;
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
if (store instanceof LifecycleAware) {
try {
// Avoid second start() call on store in case when near cache is enabled.
if (cctx.config().isWriteBehindEnabled()) {
if (!cctx.isNear())
((LifecycleAware)store).start();
}
}
catch (Exception e) {
throw new IgniteCheckedException("Failed to start cache store: " + e, e);
}
}
CacheConfiguration cfg = cctx.config();
if (cfgStore != null) {
if (!cfg.isWriteThrough() && !cfg.isReadThrough()) {
U.quietAndWarn(log,
"Persistence store is configured, but both read-through and write-through are disabled. This " +
"configuration makes sense if the store implements loadCache method only. If this is the " +
"case, ignore this warning. Otherwise, fix the configuration for the cache: " + cfg.getName(),
"Persistence store is configured, but both read-through and write-through are disabled " +
"for cache: " + cfg.getName());
}
if (!cfg.isWriteThrough() && cfg.isWriteBehindEnabled()) {
U.quietAndWarn(log,
"To enable write-behind mode for the cache store it's also required to set " +
"CacheConfiguration.setWriteThrough(true) property, otherwise the persistence " +
"store will be never updated. Consider fixing configuration for the cache: " + cfg.getName(),
"Write-behind mode for the cache store also requires CacheConfiguration.setWriteThrough(true) " +
"property. Fix configuration for the cache: " + cfg.getName());
}
if (cctx.group().persistenceEnabled() && (cfg.isWriteThrough() || cfg.isReadThrough()))
U.quietAndWarn(log,
"Both Ignite native persistence and CacheStore are configured for cache '" + cfg.getName() + "'. " +
"This configuration does not guarantee strict consistency between CacheStore and Ignite data " +
"storage upon restarts. Consult documentation for more details.");
}
sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
if (sesLsnrs == null) {
sesLsnrs = cctx.shared().storeSessionListeners();
globalSesLsnrs = true;
}
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel, boolean destroy) {
if (store instanceof LifecycleAware) {
try {
// Avoid second start() call on store in case when near cache is enabled.
if (cctx.config().isWriteBehindEnabled()) {
if (!cctx.isNear())
((LifecycleAware)store).stop();
}
}
catch (Exception e) {
U.error(log(), "Failed to stop cache store.", e);
}
}
if (!globalSesLsnrs) {
try {
CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e);
}
}
}
/** {@inheritDoc} */
@Override public boolean isLocal() {
return locStore;
}
/** {@inheritDoc} */
@Override public boolean configured() {
return store != null;
}
/** {@inheritDoc} */
@Override public CacheStore<?, ?> configuredStore() {
return cfgStore;
}
/** {@inheritDoc} */
@Override @Nullable public final Object load(@Nullable IgniteInternalTx tx, KeyCacheObject key)
throws IgniteCheckedException {
return loadFromStore(tx, key, true);
}
/**
* Loads data from persistent store.
*
* @param tx Cache transaction.
* @param key Cache key.
* @param convert Convert flag.
* @return Loaded value, possibly <tt>null</tt>.
* @throws IgniteCheckedException If data loading failed.
*/
@Nullable private Object loadFromStore(@Nullable IgniteInternalTx tx,
KeyCacheObject key,
boolean convert)
throws IgniteCheckedException {
if (store != null) {
if (key.internal())
// Never load internal keys from store as they are never persisted.
return null;
Object storeKey = cctx.unwrapBinaryIfNeeded(key, !convertBinary());
if (log.isDebugEnabled())
log.debug(S.toString("Loading value from store for key",
"key", storeKey, true));
sessionInit0(tx, StoreOperation.READ, false);
boolean threwEx = true;
Object val = null;
try {
val = singleThreadGate.load(storeKey);
threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
}
catch (CacheLoaderException e) {
throw new IgniteCheckedException(e);
}
catch (Exception e) {
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
IgniteInternalTx tx0 = tx;
if (tx0 != null && (tx0.dht() && tx0.local()))
tx0 = null;
sessionEnd0(tx0, threwEx);
}
if (log.isDebugEnabled())
log.debug(S.toString("Loaded value from store",
"key", key, true,
"val", val, true));
if (convert) {
val = convert(val);
return val;
}
else
return val;
}
return null;
}
/**
* @param val Internal value.
* @return User value.
*/
private Object convert(Object val) {
if (val == null)
return null;
return locStore ? ((IgniteBiTuple<Object, GridCacheVersion>)val).get1() : val;
}
/** {@inheritDoc} */
@Override public boolean isWriteBehind() {
return cctx.config().isWriteBehindEnabled();
}
/** {@inheritDoc} */
@Override public boolean isWriteToStoreFromDht() {
return isWriteBehind() || locStore;
}
/** {@inheritDoc} */
@Override public final void localStoreLoadAll(@Nullable IgniteInternalTx tx, Collection keys, GridInClosure3 vis)
throws IgniteCheckedException {
assert store != null;
assert locStore;
loadAllFromStore(tx, keys, null, vis);
}
/** {@inheritDoc} */
@Override public final boolean loadAll(@Nullable IgniteInternalTx tx, Collection keys, IgniteBiInClosure vis)
throws IgniteCheckedException {
if (store != null) {
loadAllFromStore(tx, keys, vis, null);
return true;
}
else {
for (Object key : keys)
vis.apply(key, null);
}
return false;
}
/**
* @param tx Cache transaction.
* @param keys Keys to load.
* @param vis Key/value closure (only one of vis or verVis can be specified).
* @param verVis Key/value/version closure (only one of vis or verVis can be specified).
* @throws IgniteCheckedException If failed.
*/
private void loadAllFromStore(@Nullable IgniteInternalTx tx,
Collection<? extends KeyCacheObject> keys,
@Nullable final IgniteBiInClosure<KeyCacheObject, Object> vis,
@Nullable final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> verVis)
throws IgniteCheckedException {
assert vis != null ^ verVis != null;
assert verVis == null || locStore;
final boolean convert = verVis == null;
if (!keys.isEmpty()) {
if (keys.size() == 1) {
KeyCacheObject key = F.first(keys);
if (convert)
vis.apply(key, load(tx, key));
else {
IgniteBiTuple<Object, GridCacheVersion> t =
(IgniteBiTuple<Object, GridCacheVersion>)loadFromStore(tx, key, false);
if (t != null)
verVis.apply(key, t.get1(), t.get2());
}
return;
}
Collection<Object> keys0 = F.viewReadOnly(keys,
new C1<KeyCacheObject, Object>() {
@Override public Object apply(KeyCacheObject key) {
return cctx.unwrapBinaryIfNeeded(key, !convertBinary());
}
});
if (log.isDebugEnabled())
log.debug("Loading values from store for keys: " + keys0);
sessionInit0(tx, StoreOperation.READ, false);
boolean threwEx = true;
try {
IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
@SuppressWarnings("ConstantConditions")
@Override public void apply(Object k, Object val) {
if (convert) {
Object v = convert(val);
vis.apply(cctx.toCacheKeyObject(k), v);
}
else {
IgniteBiTuple<Object, GridCacheVersion> v = (IgniteBiTuple<Object, GridCacheVersion>)val;
if (v != null)
verVis.apply(cctx.toCacheKeyObject(k), v.get1(), v.get2());
}
}
};
if (keys.size() > singleThreadGate.loadAllThreshold()) {
Map<Object, Object> map = store.loadAll(keys0);
if (map != null) {
for (Map.Entry<Object, Object> e : map.entrySet())
c.apply(cctx.toCacheKeyObject(e.getKey()), e.getValue());
}
}
else
singleThreadGate.loadAll(keys0, c);
threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
}
catch (CacheLoaderException e) {
throw new IgniteCheckedException(e);
}
catch (Exception e) {
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
log.debug("Loaded values from store for keys: " + keys0);
}
}
/** {@inheritDoc} */
@Override public final boolean loadCache(final GridInClosure3 vis, Object[] args) throws IgniteCheckedException {
if (store != null) {
if (log.isDebugEnabled())
log.debug("Loading all values from store.");
sessionInit0(null, StoreOperation.READ, false);
boolean threwEx = true;
try {
store.loadCache(new IgniteBiInClosure<Object, Object>() {
@Override public void apply(Object k, Object o) {
Object v;
GridCacheVersion ver = null;
if (locStore) {
IgniteBiTuple<Object, GridCacheVersion> t = (IgniteBiTuple<Object, GridCacheVersion>)o;
v = t.get1();
ver = t.get2();
}
else
v = o;
KeyCacheObject cacheKey = cctx.toCacheKeyObject(k);
vis.apply(cacheKey, v, ver);
}
}, args);
threwEx = false;
}
catch (CacheLoaderException e) {
throw new IgniteCheckedException(e);
}
catch (Exception e) {
throw new IgniteCheckedException(new CacheLoaderException(e));
}
finally {
sessionEnd0(null, threwEx);
}
if (log.isDebugEnabled())
log.debug("Loaded all values from store.");
return true;
}
LT.warn(log, "Calling Cache.loadCache() method will have no effect, " +
"CacheConfiguration.getStore() is not defined for cache: " + cctx.name());
return false;
}
/** {@inheritDoc} */
@Override public final boolean put(@Nullable IgniteInternalTx tx, KeyCacheObject key, CacheObject val, GridCacheVersion ver)
throws IgniteCheckedException {
if (store != null) {
// Never persist internal keys.
if (key instanceof GridCacheInternal)
return true;
Object key0 = cctx.unwrapBinaryIfNeeded(key, !convertBinary());
Object val0 = cctx.unwrapBinaryIfNeeded(val, !convertBinary());
if (log.isDebugEnabled()) {
log.debug(S.toString("Storing value in cache store",
"key", key0, true,
"val", val0, true));
}
sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
try {
store.write(new CacheEntryImpl<>(key0, locStore ? F.t(val0, ver) : val0));
threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
}
catch (CacheWriterException e) {
throw new IgniteCheckedException(e);
}
catch (Exception e) {
throw new IgniteCheckedException(new CacheWriterException(e));
}
finally {
sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled()) {
log.debug(S.toString("Stored value in cache store",
"key", key0, true,
"val", val0, true));
}
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public final boolean putAll(
@Nullable IgniteInternalTx tx,
Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map
) throws IgniteCheckedException {
if (F.isEmpty(map))
return true;
if (map.size() == 1) {
Map.Entry<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> e =
((Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>)map).entrySet().iterator().next();
return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2());
}
else {
if (store != null) {
EntriesView entries = new EntriesView(map);
if (log.isDebugEnabled())
log.debug("Storing values in cache store [entries=" + entries + ']');
sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
try {
store.writeAll(entries);
threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
}
catch (Exception e) {
if (!(e instanceof CacheWriterException))
e = new CacheWriterException(e);
if (!entries.isEmpty()) {
List<Object> keys = new ArrayList<>(entries.size());
for (Cache.Entry<?, ?> entry : entries)
keys.add(entry.getKey());
throw new CacheStorePartialUpdateException(keys, e);
}
throw new IgniteCheckedException(e);
}
finally {
sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
log.debug("Stored value in cache store [entries=" + entries + ']');
return true;
}
return false;
}
}
/** {@inheritDoc} */
@Override public final boolean remove(@Nullable IgniteInternalTx tx, KeyCacheObject key) throws IgniteCheckedException {
if (store != null) {
// Never remove internal key from store as it is never persisted.
if (key instanceof GridCacheInternal)
return false;
Object key0 = cctx.unwrapBinaryIfNeeded(key, !convertBinary());
if (log.isDebugEnabled())
log.debug(S.toString("Removing value from cache store", "key", key0, true));
sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
try {
store.delete(key0);
threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
}
catch (CacheWriterException e) {
throw new IgniteCheckedException(e);
}
catch (Exception e) {
throw new IgniteCheckedException(new CacheWriterException(e));
}
finally {
sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
log.debug(S.toString("Removed value from cache store", "key", key0, true));
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public final boolean removeAll(
@Nullable IgniteInternalTx tx,
Collection<? extends KeyCacheObject> keys
) throws IgniteCheckedException {
if (F.isEmpty(keys))
return true;
if (keys.size() == 1) {
KeyCacheObject key = keys.iterator().next();
return remove(tx, key);
}
if (store != null) {
Collection<Object> keys0 = cctx.unwrapBinariesIfNeeded(keys, !convertBinary());
if (log.isDebugEnabled())
log.debug(S.toString("Removing values from cache store",
"keys", keys0, true));
sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
try {
store.deleteAll(keys0);
threwEx = false;
}
catch (ClassCastException e) {
handleClassCastException(e);
}
catch (Exception e) {
if (!(e instanceof CacheWriterException))
e = new CacheWriterException(e);
if (!keys0.isEmpty())
throw new CacheStorePartialUpdateException(keys0, e);
throw new IgniteCheckedException(e);
}
finally {
sessionEnd0(tx, threwEx);
}
if (log.isDebugEnabled())
log.debug(S.toString("Removed values from cache store",
"keys", keys0, true));
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public CacheStore<Object, Object> store() {
return store;
}
/** {@inheritDoc} */
@Override public void forceFlush() throws IgniteCheckedException {
if (store instanceof GridCacheWriteBehindStore)
((GridCacheWriteBehindStore)store).forceFlush();
}
/** {@inheritDoc} */
@Override public final void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last,
boolean storeSessionEnded) throws IgniteCheckedException {
assert store != null;
sessionInit0(tx, commit? StoreOperation.COMMIT: StoreOperation.ROLLBACK, false);
try {
if (sesLsnrs != null && sesHolder.get().contains(store)) {
for (CacheStoreSessionListener lsnr : sesLsnrs)
lsnr.onSessionEnd(locSes, commit);
}
if (!sesHolder.get().ended(store) && !storeSessionEnded)
store.sessionEnd(commit);
}
catch (Throwable e) {
last = true;
throw e;
}
finally {
if (last && sesHolder != null) {
sesHolder.set(null);
tx.removeMeta(SES_ATTR);
}
}
}
/**
* @param e Class cast exception.
* @throws IgniteCheckedException Thrown exception.
*/
private void handleClassCastException(ClassCastException e) throws IgniteCheckedException {
assert e != null;
if (e.getMessage() != null) {
throw new IgniteCheckedException("Cache store must work with binary objects if binary are " +
"enabled for cache [cacheName=" + cctx.name() + ']', e);
}
else
throw e;
}
/** {@inheritDoc} */
@Override public void writeBehindSessionInit() throws IgniteCheckedException {
sessionInit0(null, null, true);
}
/** {@inheritDoc} */
@Override public void writeBehindCacheStoreSessionListenerStart() throws IgniteCheckedException {
assert sesHolder.get() != null;
notifyCacheStoreSessionListeners(sesHolder.get(), null, true);
}
/** {@inheritDoc} */
@Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException {
sessionEnd0(null, threwEx);
}
/**
* @param tx Current transaction.
* @param op Store operation.
* @param writeBehindStoreInitiator {@code true} if method call is initiated by {@link GridCacheWriteBehindStore}.
* @throws IgniteCheckedException If failed.
*/
private void sessionInit0(@Nullable IgniteInternalTx tx, @Nullable StoreOperation op,
boolean writeBehindStoreInitiator) throws IgniteCheckedException {
assert sesHolder != null;
SessionData ses;
if (tx != null) {
ses = tx.meta(SES_ATTR);
if (ses == null) {
ses = new SessionData(tx, cctx.name());
tx.addMeta(SES_ATTR, ses);
}
else
// Session cache name may change in cross-cache transaction.
ses.cacheName(cctx.name());
}
else
ses = new SessionData(null, cctx.name());
sesHolder.set(ses);
notifyCacheStoreSessionListeners(ses, op, writeBehindStoreInitiator);
}
/**
* @param ses Current session.
* @param op Store operation.
* @param writeBehindStoreInitiator {@code True} if method call is initiated by {@link GridCacheWriteBehindStore}.
* @throws IgniteCheckedException If failed.
*/
private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOperation op,
boolean writeBehindStoreInitiator) throws IgniteCheckedException {
try {
boolean notifyLsnrs = false;
if (writeBehindStoreInitiator)
notifyLsnrs = !ses.started(store) && sesLsnrs != null;
else {
assert op != null;
switch (op) {
case READ:
notifyLsnrs = readThrough && !ses.started(store) && sesLsnrs != null;
break;
case WRITE:
notifyLsnrs = !cacheConfiguration().isWriteBehindEnabled() && writeThrough
&& !ses.started(store) && sesLsnrs != null;
break;
case COMMIT:
case ROLLBACK:
// No needs to start the session (if not started yet) and notify listeners.
break;
default:
assert false : "Unexpected operation: " + op.toString();
}
}
if (notifyLsnrs) {
for (CacheStoreSessionListener lsnr : sesLsnrs)
lsnr.onSessionStart(locSes);
}
}
catch (Exception e) {
throw new IgniteCheckedException("Failed to start store session: " + e, e);
}
}
/**
* Clears session holder.
*/
private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
try {
if (tx == null) {
if (sesLsnrs != null && sesHolder.get().contains(store)) {
for (CacheStoreSessionListener lsnr : sesLsnrs)
lsnr.onSessionEnd(locSes, !threwEx);
}
if (!sesHolder.get().ended(store))
store.sessionEnd(!threwEx);
}
}
catch (Exception e) {
if (!threwEx)
throw U.cast(e);
}
finally {
if (sesHolder != null)
sesHolder.set(null);
}
}
/**
* @return Ignite context.
*/
protected abstract GridKernalContext igniteContext();
/**
* @return Cache configuration.
*/
protected abstract CacheConfiguration cacheConfiguration();
/**
*
*/
private static class SessionData {
/** */
@GridToStringExclude
private final TxProxy tx;
/** */
private String cacheName;
/** */
@GridToStringInclude
private Map<Object, Object> props;
/** */
private Object attach;
/** */
private final Set<CacheStore> started =
new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
/**
* @param tx Current transaction.
* @param cacheName Cache name.
*/
private SessionData(@Nullable final IgniteInternalTx tx, @Nullable String cacheName) {
this.tx = tx != null ? new TxProxy(tx) : null;
this.cacheName = cacheName;
}
/**
* @return Transaction.
*/
@Nullable private Transaction transaction() {
return tx;
}
/**
* @return Properties.
*/
private Map<Object, Object> properties() {
if (props == null)
props = new GridLeanMap<>();
return props;
}
/**
* @param attach Attachment.
*/
private Object attach(Object attach) {
Object prev = this.attach;
this.attach = attach;
return prev;
}
/**
* @return Attachment.
*/
private Object attachment() {
return attach;
}
/**
* @return Cache name.
*/
private String cacheName() {
return cacheName;
}
/**
* @param cacheName Cache name.
*/
private void cacheName(String cacheName) {
this.cacheName = cacheName;
}
/**
* @return If session is started.
*/
private boolean started(CacheStore store) {
return !started.add(store);
}
/**
* @param store Cache store.
* @return Whether session already ended on this store instance.
*/
private boolean ended(CacheStore store) {
return !started.remove(store);
}
/**
* @param store Cache store.
* @return {@code True} if session started.
*/
private boolean contains(CacheStore store) {
return started.contains(store);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null));
}
}
/**
*
*/
private static class ThreadLocalSession implements CacheStoreSession {
/** */
private final ThreadLocal<SessionData> sesHolder;
/**
* @param sesHolder Session holder.
*/
private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
this.sesHolder = sesHolder;
}
/** {@inheritDoc} */
@Nullable @Override public Transaction transaction() {
SessionData ses0 = sesHolder.get();
return ses0 != null ? ses0.transaction() : null;
}
/** {@inheritDoc} */
@Override public boolean isWithinTransaction() {
return transaction() != null;
}
/** {@inheritDoc} */
@Override public Object attach(@Nullable Object attachment) {
SessionData ses0 = sesHolder.get();
return ses0 != null ? ses0.attach(attachment) : null;
}
/** {@inheritDoc} */
@Nullable @Override public <T> T attachment() {
SessionData ses0 = sesHolder.get();
return ses0 != null ? (T)ses0.attachment() : null;
}
/** {@inheritDoc} */
@Override public <K1, V1> Map<K1, V1> properties() {
SessionData ses0 = sesHolder.get();
return ses0 != null ? (Map<K1, V1>)ses0.properties() : null;
}
/** {@inheritDoc} */
@Nullable @Override public String cacheName() {
SessionData ses0 = sesHolder.get();
return ses0 != null ? ses0.cacheName() : null;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ThreadLocalSession.class, this);
}
}
/**
*
*/
private class EntriesView extends AbstractCollection<Cache.Entry<?, ?>> {
/** */
private final Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map;
/** */
private Set<Object> rmvd;
/** */
private boolean cleared;
/**
* @param map Map.
*/
private EntriesView(Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> map) {
assert map != null;
this.map = map;
}
/** {@inheritDoc} */
@Override public int size() {
return cleared ? 0 : (map.size() - (rmvd != null ? rmvd.size() : 0));
}
/** {@inheritDoc} */
@Override public boolean isEmpty() {
return cleared || !iterator().hasNext();
}
/** {@inheritDoc} */
@Override public boolean contains(Object o) {
if (cleared || !(o instanceof Cache.Entry))
return false;
Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
return map.containsKey(e.getKey());
}
/** {@inheritDoc} */
@NotNull @Override public Iterator<Cache.Entry<?, ?>> iterator() {
if (cleared)
return new GridEmptyIterator<>();
final Iterator<Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>>> it0 = (Iterator)map.entrySet().iterator();
return new Iterator<Cache.Entry<?, ?>>() {
/** */
private Cache.Entry<?, ?> cur;
/** */
private Cache.Entry<?, ?> next;
/**
*
*/
{
checkNext();
}
/**
*
*/
private void checkNext() {
while (it0.hasNext()) {
Map.Entry<?, IgniteBiTuple<?, GridCacheVersion>> e = it0.next();
Object k = e.getKey();
Object v = locStore ? e.getValue() : e.getValue().get1();
k = cctx.unwrapBinaryIfNeeded(k, !convertBinary());
v = cctx.unwrapBinaryIfNeeded(v, !convertBinary());
if (rmvd != null && rmvd.contains(k))
continue;
next = new CacheEntryImpl<>(k, v);
break;
}
}
@Override public boolean hasNext() {
return next != null;
}
@Override public Cache.Entry<?, ?> next() {
if (next == null)
throw new NoSuchElementException();
cur = next;
next = null;
checkNext();
return cur;
}
@Override public void remove() {
if (cur == null)
throw new IllegalStateException();
addRemoved(cur);
cur = null;
}
};
}
/** {@inheritDoc} */
@Override public boolean add(Cache.Entry<?, ?> entry) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean addAll(Collection<? extends Cache.Entry<?, ?>> col) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean remove(Object o) {
if (cleared || !(o instanceof Cache.Entry))
return false;
Cache.Entry<?, ?> e = (Cache.Entry<?, ?>)o;
if (rmvd != null && rmvd.contains(e.getKey()))
return false;
if (mapContains(e)) {
addRemoved(e);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public boolean containsAll(Collection<?> col) {
if (cleared)
return false;
for (Object o : col) {
if (contains(o))
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public boolean removeAll(Collection<?> col) {
if (cleared)
return false;
boolean modified = false;
for (Object o : col) {
if (remove(o))
modified = true;
}
return modified;
}
/** {@inheritDoc} */
@Override public boolean retainAll(Collection<?> col) {
if (cleared)
return false;
boolean modified = false;
for (Cache.Entry<?, ?> e : this) {
if (!col.contains(e)) {
addRemoved(e);
modified = true;
}
}
return modified;
}
/** {@inheritDoc} */
@Override public void clear() {
cleared = true;
}
/**
* @param e Entry.
*/
private void addRemoved(Cache.Entry<?, ?> e) {
if (rmvd == null)
rmvd = new HashSet<>();
rmvd.add(e.getKey());
}
/**
* @param e Entry.
* @return {@code True} if original map contains entry.
*/
private boolean mapContains(Cache.Entry<?, ?> e) {
return map.containsKey(e.getKey());
}
/** {@inheritDoc} */
public String toString() {
if (!S.INCLUDE_SENSITIVE)
return "[size=" + size() + "]";
Iterator<Cache.Entry<?, ?>> it = iterator();
if (!it.hasNext())
return "[]";
SB sb = new SB("[");
while (true) {
Cache.Entry<?, ?> e = it.next();
sb.a(e.toString());
if (!it.hasNext())
return sb.a(']').toString();
sb.a(", ");
}
}
}
/**
*
*/
private static class TxProxy implements Transaction {
/** */
private final IgniteInternalTx tx;
/**
* @param tx Transaction.
*/
TxProxy(IgniteInternalTx tx) {
assert tx != null;
this.tx = tx;
}
/** {@inheritDoc} */
@Override public IgniteUuid xid() {
return tx.xid();
}
/** {@inheritDoc} */
@Override public UUID nodeId() {
return tx.nodeId();
}
/** {@inheritDoc} */
@Override public long threadId() {
return tx.threadId();
}
/** {@inheritDoc} */
@Override public long startTime() {
return tx.startTime();
}
/** {@inheritDoc} */
@Override public TransactionIsolation isolation() {
return tx.isolation();
}
/** {@inheritDoc} */
@Override public TransactionConcurrency concurrency() {
return tx.concurrency();
}
/** {@inheritDoc} */
@Override public boolean implicit() {
return tx.implicit();
}
/** {@inheritDoc} */
@Override public boolean isInvalidate() {
return tx.isInvalidate();
}
/** {@inheritDoc} */
@Override public TransactionState state() {
return tx.state();
}
/** {@inheritDoc} */
@Override public void suspend() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Nullable @Override public String label() {
return null;
}
/** {@inheritDoc} */
@Override public long timeout() {
return tx.timeout();
}
/** {@inheritDoc} */
@Override public long timeout(long timeout) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean setRollbackOnly() {
return tx.setRollbackOnly();
}
/** {@inheritDoc} */
@Override public boolean isRollbackOnly() {
return tx.isRollbackOnly();
}
/** {@inheritDoc} */
@Override public void commit() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> commitAsync() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void close() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void rollback() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public void resume() throws IgniteException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public IgniteAsyncSupport withAsync() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public boolean isAsync() {
return false;
}
/** {@inheritDoc} */
@Override public <R> IgniteFuture<R> future() {
throw new UnsupportedOperationException();
}
}
/** Enumeration that represents possible operations on the underlying store. */
private enum StoreOperation {
/** Read key-value pair from the underlying store. */
READ,
/** Update or remove key from the underlying store. */
WRITE,
/** Commit changes to the underlying store. */
COMMIT,
/** Rollback changes to the underlying store. */
ROLLBACK
}
}