blob: d9120ababfdd72c1d2778d43f8f6d1c1c36f3a06 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static javax.cache.Cache.Entry;
import static org.apache.ignite.internal.util.tostring.GridToStringBuilder.includeSensitive;
* Internal wrapper for a {@link CacheStore} that enables write-behind logic.
* <p/>
* The general purpose of this approach is to reduce cache store load under high
* store update rate. The idea is to cache all write and remove operations in a pending
* map and delegate these changes to the underlying store either after timeout or
* if size of a pending map exceeded some pre-configured value. Another performance gain
* is achieved due to combining a group of similar operations to a single batch update.
* <p/>
* The essential flush size for the write-behind cache should be at least the estimated
* count of simultaneously written keys. In case of significantly smaller value there would
* be triggered a lot of flush events that will result in a high cache store load.
* <p/>
* Since write operations to the cache store are deferred, transaction support is lost; no
* transaction objects are passed to the underlying store.
* <p/>
* {@link GridCacheWriteBehindStore} doesn't support concurrent modifications of the same key.
public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
/** Default write cache initial capacity. */
public static final int DFLT_INITIAL_CAPACITY = 1024;
/** Overflow ratio for critical cache size calculation. */
public static final float CACHE_OVERFLOW_RATIO = 1.5f;
/** Default concurrency level of write cache. */
public static final int DFLT_CONCUR_LVL = 64;
/** Write cache initial capacity. */
private int initCap = DFLT_INITIAL_CAPACITY;
/** Concurrency level for write cache access. */
private int concurLvl = DFLT_CONCUR_LVL;
/** When cache size exceeds this value eldest entry will be stored to the underlying store. */
private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE;
/** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */
private int cacheCriticalSize;
/** Count of worker threads performing underlying store updates. */
private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
/** Is flush threads count power of two flag. */
private boolean flushThreadCntIsPowerOfTwo;
/** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
/** Maximum batch size for put and remove operations */
private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
/** Ignite instance name. */
private final String igniteInstanceName;
/** Cache name. */
private final String cacheName;
/** Underlying store. */
private final CacheStore<K, V> store;
/** Write cache. */
private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
/** Flusher threads. */
private Flusher[] flushThreads;
/** Write coalescing. */
private boolean writeCoalescing = CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING;
/** Atomic flag indicating store shutdown. */
private AtomicBoolean stopping = new AtomicBoolean(true);
/** Variable for counting total cache overflows. */
private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
/** Variable contains current number of overflow events. */
private AtomicInteger cacheOverflowCntr = new AtomicInteger();
/** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */
private AtomicInteger retryEntriesCnt = new AtomicInteger();
/** Log. */
private final IgniteLogger log;
/** Store manager. */
private final CacheStoreManager storeMgr;
/** Flush lock. */
private final Lock flushLock = new ReentrantLock();
/** Condition to determine records available for flush. */
private Condition canFlush = flushLock.newCondition();
* Creates a write-behind cache store for the given store.
* @param storeMgr Store manager.
* @param igniteInstanceName Ignite instance name.
* @param cacheName Cache name.
* @param log Grid logger.
* @param store {@code GridCacheStore} that need to be wrapped.
public GridCacheWriteBehindStore(
CacheStoreManager storeMgr,
String igniteInstanceName,
String cacheName,
IgniteLogger log,
CacheStore<K, V> store) {
this.storeMgr = storeMgr;
this.igniteInstanceName = igniteInstanceName;
this.cacheName = cacheName;
this.log = log; = store;
* Sets initial capacity for the write cache.
* @param initCap Initial capacity.
public void setInitialCapacity(int initCap) {
this.initCap = initCap;
* Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads
* attempting to update cache.
* @param concurLvl Concurrency level.
public void setConcurrencyLevel(int concurLvl) {
this.concurLvl = concurLvl;
* Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value,
* the eldest entry in the cache is immediately scheduled for write to the underlying store.
* @param cacheMaxSize Max cache size.
public void setFlushSize(int cacheMaxSize) {
this.cacheMaxSize = cacheMaxSize;
* Gets the maximum size of the write-behind buffer. When the count of unique keys
* in write buffer exceeds this value, the buffer is scheduled for write to the underlying store.
* <p/>
* If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
* when this value is {@code 0}, the cache critical size is set to
* {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}.
* @return Buffer size that triggers flush procedure.
public int getWriteBehindFlushSize() {
return cacheMaxSize;
* Sets the number of threads that will perform store update operations.
* @param flushThreadCnt Count of worker threads.
public void setFlushThreadCount(int flushThreadCnt) {
this.flushThreadCnt = flushThreadCnt;
this.flushThreadCntIsPowerOfTwo = U.isPow2(flushThreadCnt);
* Gets the number of flush threads that will perform store update operations.
* @return Count of worker threads.
public int getWriteBehindFlushThreadCount() {
return flushThreadCnt;
* Sets the write coalescing flag.
* @param writeCoalescing Write coalescing flag.
public void setWriteCoalescing(boolean writeCoalescing) {
this.writeCoalescing = writeCoalescing;
* Gets the write coalescing flag.
* @return Write coalescing flag.
public boolean getWriteCoalescing() {
return writeCoalescing;
* Sets the cache flush frequency. All pending operations on the underlying store will be performed
* within time interval not less then this value.
* @param cacheFlushFreq Time interval value in milliseconds.
public void setFlushFrequency(long cacheFlushFreq) {
this.cacheFlushFreq = cacheFlushFreq;
* Gets the cache flush frequency. All pending operations on the underlying store will be performed
* within time interval not less then this value.
* <p/>
* If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size.
* @return Flush frequency in milliseconds.
public long getWriteBehindFlushFrequency() {
return cacheFlushFreq;
* Sets the maximum count of similar operations that can be grouped to a single batch.
* @param batchSize Maximum count of batch.
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
* Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch.
* @return Maximum size of batch.
public int getWriteBehindStoreBatchSize() {
return batchSize;
* Gets count of entries that were processed by the write-behind store and have not been
* flushed to the underlying store yet.
* @return Total count of entries in cache store internal buffer.
public int getWriteBehindBufferSize() {
if (writeCoalescing)
return writeCache.sizex();
else {
int size = 0;
for (Flusher f : flushThreads)
size += f.size();
return size;
* @return Underlying store.
public CacheStore<K, V> store() {
return store;
* Performs all the initialization logic for write-behind cache store.
* This class must not be used until this method returns.
@Override public void start() {
assert cacheFlushFreq != 0 || cacheMaxSize != 0;
if (stopping.compareAndSet(true, false)) {
if (log.isDebugEnabled())
log.debug("Starting write-behind store for cache '" + cacheName + '\'');
cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO);
if (cacheCriticalSize == 0)
cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
flushThreads = new GridCacheWriteBehindStore.Flusher[flushThreadCnt];
if (writeCoalescing)
writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
for (int i = 0; i < flushThreads.length; i++) {
flushThreads[i] = new Flusher(igniteInstanceName, "flusher-" + i, log);
* Gets count of write buffer overflow events since initialization. Each overflow event causes
* the ongoing flush operation to be performed synchronously.
* @return Count of cache overflow events since start.
public int getWriteBehindTotalCriticalOverflowCount() {
return cacheTotalOverflowCntr.get();
* Gets count of write buffer overflow events in progress at the moment. Each overflow event causes
* the ongoing flush operation to be performed synchronously.
* @return Count of cache overflow events since start.
public int getWriteBehindCriticalOverflowCount() {
return cacheOverflowCntr.get();
* Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state
* when underlying store failed due some reason and cache has enough space to retain this entry till
* the next try.
* @return Count of entries in store-retry state.
public int getWriteBehindErrorRetryCount() {
return retryEntriesCnt.get();
* Performs shutdown logic for store. No put, get and remove requests will be processed after
* this method is called.
@Override public void stop() {
if (stopping.compareAndSet(false, true)) {
if (log.isDebugEnabled())
log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
for (Flusher f : flushThreads) {
if (!f.isEmpty())
boolean graceful = true;
for (GridWorker worker : flushThreads)
graceful &= U.join(worker, log);
if (!graceful)
log.warning("Write behind store shutdown was aborted.");
* Forces all entries collected to be flushed to the underlying store.
* @throws IgniteCheckedException If failed.
public void forceFlush() throws IgniteCheckedException {
for (Flusher f : flushThreads) {
if (!f.isEmpty())
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
store.loadCache(clo, args);
/** {@inheritDoc} */
@Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
if (log.isDebugEnabled())
log.debug(S.toString("Store load all",
"keys", keys, true));
Map<K, V> loaded = new HashMap<>();
Collection<K> remaining = null;
for (K key : keys) {
StatefulValue<K, V> val;
if (writeCoalescing)
val = writeCache.get(key);
val = flusher(key).flusherWriteMap.get(key);
if (val != null) {
try {
StoreOperation op;
V value;
if (writeCoalescing && val.nextOperation() != null) {
op = val.nextOperation();
value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
} else {
op = val.operation();
value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
if (op == StoreOperation.PUT)
loaded.put(key, value);
assert op == StoreOperation.RMV : op;
finally {
else {
if (remaining == null)
remaining = new ArrayList<>();
// For items that were not found in queue.
if (remaining != null && !remaining.isEmpty()) {
Map<K, V> loaded0 = store.loadAll(remaining);
if (loaded0 != null)
return loaded;
/** {@inheritDoc} */
@Override public V load(K key) {
if (log.isDebugEnabled())
log.debug(S.toString("Store load",
"key", key, true));
StatefulValue<K, V> val;
if (writeCoalescing)
val = writeCache.get(key);
val = flusher(key).flusherWriteMap.get(key);
if (val != null) {
try {
StoreOperation op;
V value;
if (writeCoalescing && val.nextOperation() != null) {
op = val.nextOperation();
value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
} else {
op = val.operation();
value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
switch (op) {
case PUT:
return value;
case RMV:
return null;
assert false : "Unexpected operation: " + val.status();
finally {
return store.load(key);
/** {@inheritDoc} */
@Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) {
for (Entry<? extends K, ? extends V> e : entries)
/** {@inheritDoc} */
@Override public void write(Entry<? extends K, ? extends V> entry) {
try {
if (log.isDebugEnabled())
log.debug(S.toString("Store put",
"key", entry.getKey(), true,
"val", entry.getValue(), true));
updateCache(entry.getKey(), entry, StoreOperation.PUT);
catch (IgniteInterruptedCheckedException e) {
throw new CacheWriterException(U.convertExceptionNoWrap(e));
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) {
for (Object key : keys)
/** {@inheritDoc} */
@Override public void delete(Object key) {
try {
if (log.isDebugEnabled())
log.debug(S.toString("Store remove",
"key", key, true));
updateCache((K)key, null, StoreOperation.RMV);
catch (IgniteInterruptedCheckedException e) {
throw new CacheWriterException(U.convertExceptionNoWrap(e));
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
// No-op.
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheWriteBehindStore.class, this);
* Performs flush-consistent cache update for the given key.
* @param key Key for which update is performed.
* @param val New value, may be null for remove operation.
* @param operation Updated value status.
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
private void updateCache(K key,
@Nullable Entry<? extends K, ? extends V> val,
StoreOperation operation)
throws IgniteInterruptedCheckedException {
StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
if (writeCoalescing)
putToWriteCache(key, newVal);
flusher(key).putToFlusherWriteCache(key, newVal);
* Performs flush-consistent writeCache update for the given key.
* @param key Key for which update is performed.
* @param newVal stateful value to put
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
private void putToWriteCache(
K key,
StatefulValue<K, V> newVal)
throws IgniteInterruptedCheckedException {
StatefulValue<K, V> prev;
assert writeCoalescing : "Unexpected write coalescing.";
while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
try {
if (prev.status() == ValueStatus.PENDING || prev.status() == ValueStatus.PENDING_AND_UPDATED) {
// Flush process in progress, save next value and update the status.
prev.setNext(newVal.val, newVal.storeOperation);
else if (prev.status() == ValueStatus.FLUSHED)
// This entry was deleted from map before we acquired the lock.
else if (prev.status() == ValueStatus.RETRY)
// New value has come, old value is no longer in RETRY state,
assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
prev.update(newVal.val, newVal.operation(), ValueStatus.NEW);
finally {
// Now check the map size
int cacheSize = getWriteBehindBufferSize();
if (cacheSize > cacheCriticalSize)
// Perform single store update in the same thread.
else if (cacheMaxSize > 0 && cacheSize > cacheMaxSize)
* Return flusher by key.
* @param key Key for search.
* @return flusher.
private Flusher flusher(K key) {
return flushThreads[resolveFlusherByKeyHash(key.hashCode())];
* Lookup flusher index by provided key hash using
* approach similar to {@link HashMap#hash(Object)}. In case
* <code>size</code> is not a power of 2 we fallback to modulo operation.
* @param hash Object hash.
* @return Calculated flucher index [0..flushThreadCnt).
int resolveFlusherByKeyHash(int hash) {
int h = (hash ^ (hash >>> 16));
return flushThreadCntIsPowerOfTwo
? h & (flushThreadCnt - 1)
: U.hashToIndex(h, flushThreadCnt);
* Flushes one upcoming value to the underlying store. Called from
* {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
* critical size.
private void flushSingleValue() {
try {
Map<K, StatefulValue<K, V>> batch;
for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
StatefulValue<K, V> val = e.getValue();
try {
ValueStatus status = val.status();
if (acquired(status))
// Another thread is helping us, continue to the next entry.
if (val.status() == ValueStatus.RETRY)
assert retryEntriesCnt.get() >= 0;
batch = Collections.singletonMap(e.getKey(), val);
finally {
if (!batch.isEmpty()) {
applyBatch(batch, false, null);
finally {
* Performs batch operation on underlying store.
* @param valMap Batch map.
* @param initSes {@code True} if need to initialize session.
* @param flusher Flusher, assotiated with all keys in batch (have sense in write coalescing = false mode)
* @return {@code True} if batch was successfully applied, {@code False} otherwise.
private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes, Flusher flusher) {
assert valMap.size() <= batchSize;
assert !valMap.isEmpty();
StoreOperation operation = null;
// Construct a map for underlying store
Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
StatefulValue<K, V> val = e.getValue();
try {
if (operation == null)
operation = val.operation();
assert operation == val.operation();
assert val.status() == ValueStatus.PENDING || val.status() == ValueStatus.PENDING_AND_UPDATED;
batch.put(e.getKey(), val.entry());
} finally {
boolean result = updateStore(operation, batch, initSes, flusher);
if (result) {
for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
StatefulValue<K, V> val = e.getValue();
try {
if (writeCoalescing) {
if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
val.setNext(null, null);
else {
StatefulValue<K, V> prev = writeCache.remove(e.getKey());
// Additional check to ensure consistency.
assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
else {
Flusher f = flusher(e.getKey());
// Can remove using equal because if map contains another similar value it has different state.
f.flusherWriteMap.remove(e.getKey(), e.getValue());
finally {
else {
// Exception occurred, we must set RETRY status
for (StatefulValue<K, V> val : valMap.values()) {
try {
if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
val.setNext(null, null);
else {
finally {
return result;
* Tries to update store with the given values and returns {@code true} in case of success.
* <p/> If any exception in underlying store is occurred, this method checks the map size.
* If map size exceeds some critical value, then it returns {@code true} and this value will
* be lost. If map size does not exceed critical value, it will return false and value will
* be retained in write cache.
* @param operation Status indicating operation that should be performed.
* @param vals Key-Value map.
* @param initSes {@code True} if need to initialize session.
* @param flusher Flusher, assotiated with vals keys (in writeCoalescing=false mode)
* @return {@code true} if value may be deleted from the write cache,
* {@code false} otherwise
private boolean updateStore(
StoreOperation operation,
Map<K, Entry<? extends K, ? extends V>> vals,
boolean initSes,
Flusher flusher
) {
try {
if (storeMgr != null) {
if (initSes)
// Back-pressure mechanism is running.
// Cache store session must be initialized by storeMgr.
boolean threwEx = true;
try {
switch (operation) {
case PUT:
case RMV:
assert false : "Unexpected operation: " + operation;
threwEx = false;
return true;
finally {
if (initSes && storeMgr != null)
catch (Exception e) {
LT.warn(log, e, "Unable to update underlying store: " + store, false, false);
boolean overflow;
if (writeCoalescing)
overflow = writeCache.sizex() > cacheCriticalSize || stopping.get();
overflow = flusher.isOverflowed() || stopping.get();
if (overflow) {
for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) {
Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
log.error("Failed to update store (value will be lost as current buffer size is greater " +
"than 'cacheCriticalSize' or node has been stopped before store was repaired) [" +
(includeSensitive() ? "key=" + entry.getKey() + ", val=" + val + ", " : "") +
"op=" + operation + "]");
return true;
return false;
* Wakes up flushing threads if map size exceeded maximum value or in case of shutdown.
private void wakeUp() {
try {
finally {
* Thread that performs time/size-based flushing of written values to the underlying storage.
private class Flusher extends GridWorker {
/** Queue to flush. */
private final FastSizeDeque<IgniteBiTuple<K, StatefulValue<K, V>>> queue;
/** Flusher write map. */
private final ConcurrentHashMap<K, StatefulValue<K, V>> flusherWriteMap;
/** Critical size of flusher local queue. */
private final int flusherCacheCriticalSize;
/** Flusher parked flag. */
private volatile boolean parked;
/** Flusher thread. */
protected Thread thread;
/** Cache flushing frequence in nanos. */
protected long cacheFlushFreqNanos = cacheFlushFreq * 1000 * 1000;
/** Writer lock. */
private final Lock flusherWriterLock = new ReentrantLock();
/** Confition to determine available space for flush. */
private Condition flusherWriterCanWrite = flusherWriterLock.newCondition();
/** {@inheritDoc */
protected Flusher(String igniteInstanceName,
String name,
IgniteLogger log) {
super(igniteInstanceName, name, log);
flusherCacheCriticalSize = cacheCriticalSize / flushThreadCnt;
assert flusherCacheCriticalSize > batchSize;
if (writeCoalescing) {
queue = null;
flusherWriteMap = null;
else {
queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl);
/** Start flusher thread */
protected void start() {
thread = new IgniteThread(this);
* Performs flush-consistent flusher writeCache update for the given key.
* @param key Key for which update is performed.
* @param newVal stateful value to put
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
private void putToFlusherWriteCache(
K key,
StatefulValue<K, V> newVal
) throws IgniteInterruptedCheckedException {
assert !writeCoalescing : "Unexpected write coalescing.";
if (queue.sizex() > flusherCacheCriticalSize) {
while (queue.sizex() > flusherCacheCriticalSize) {
try {
// Wait for free space in flusher queue
while (queue.sizex() >= flusherCacheCriticalSize && !stopping.get()) {
if (cacheFlushFreq > 0)
flusherWriterCanWrite.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
catch (InterruptedException e) {
if (log.isDebugEnabled())
log.debug("Caught interrupted exception: " + e);
finally {
queue.add(F.t(key, newVal));
flusherWriteMap.put(key, newVal);
* Get overflowed flag.
* @return {@code True} if write behind flusher is overflowed,
* {@code False} otherwise.
public boolean isOverflowed() {
if (writeCoalescing)
return writeCache.sizex() > cacheCriticalSize;
return queue.sizex() > flusherCacheCriticalSize;
* Get write behind flusher size.
* @return Flusher write behind size.
public int size() {
return writeCoalescing ? writeCache.sizex() : queue.sizex();
* Test if write behind flusher is empty
* @return {@code True} if write behind flusher is empty, {@code False} otherwise
public boolean isEmpty() {
return writeCoalescing ? writeCache.isEmpty() : queue.isEmpty();
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
if (writeCoalescing) {
while (!stopping.get() || writeCache.sizex() > 0) {
else {
while (!stopping.get() || queue.sizex() > 0) {
* This method awaits until enough elements in flusher queue are available or given timeout is over.
* @throws InterruptedException If awaiting was interrupted.
private void awaitOperationsAvailableCoalescing() throws InterruptedException {
try {
do {
if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) {
if (cacheFlushFreq > 0)
canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
while (writeCache.sizex() == 0 && !stopping.get());
finally {
* This method awaits until enough elements in flusher queue are available or given timeout is over.
* @throws InterruptedException If awaiting was interrupted.
private void awaitOperationsAvailableNonCoalescing() throws InterruptedException {
if (queue.sizex() >= batchSize)
parked = true;
try {
for (;;) {
if (queue.sizex() >= batchSize)
if (cacheFlushFreq > 0)
if (queue.sizex() > 0)
if (Thread.interrupted())
throw new InterruptedException();
if (stopping.get())
finally {
parked = false;
* Wake up flusher thread.
public void wakeUp() {
if (parked)
* Removes values from the write cache and performs corresponding operation
* on the underlying store.
private void flushCacheCoalescing() {
StoreOperation prevOperation = null;
Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize);
Iterator<Map.Entry<K, StatefulValue<K, V>>> it = writeCache.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<K, StatefulValue<K, V>> e =;
StatefulValue<K, V> val = e.getValue();
if (!val.writeLock().tryLock()) // TODO: stripe write maps to avoid lock contention.
try {
BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, e.getKey(), val);
switch (addRes) {
// No need to test first value in batch
applyBatch(pending, true, null);
pending = U.newLinkedHashMap(batchSize);
pending.put(e.getKey(), val);
prevOperation = val.operation();
case ADDED:
prevOperation = val.operation();
assert addRes == BatchingResult.SKIPPED : "Unexpected result: " + addRes;
finally {
if (val.writeLock().isHeldByCurrentThread())
// Process the remainder.
if (!pending.isEmpty())
applyBatch(pending, true, null);
* Removes values from the flusher write queue and performs corresponding operation
* on the underlying store.
private void flushCacheNonCoalescing() {
StoreOperation prevOperation;
Map<K, StatefulValue<K, V>> pending;
IgniteBiTuple<K, StatefulValue<K, V>> tuple;
boolean applied;
while (!queue.isEmpty()) {
pending = U.newLinkedHashMap(batchSize);
prevOperation = null;
boolean needNewBatch = false;
// Collect batch
while (!needNewBatch && (tuple = queue.peek()) != null) {
BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, tuple.getKey(),
switch (addRes) {
case ADDED:
prevOperation = tuple.getValue().operation();
assert false : "Unexpected result: " + addRes;
needNewBatch = true;
prevOperation = null;
assert false : "Unexpected result: " + addRes;
// Process collected batch
applied = applyBatch(pending, true, this);
if (applied) {
// Wake up awaiting writers
try {
finally {
else {
// Return values to queue
ArrayList<Map.Entry<K, StatefulValue<K, V>>> pendingList = new ArrayList(pending.entrySet());
for (int i = pendingList.size() - 1; i >= 0; i--)
queue.addFirst(F.t(pendingList.get(i).getKey(), pendingList.get(i).getValue()));
* Trying to add key and statefull value pairs into pending map.
* @param pending Map to populate.
* @param key Key to add.
* @param val Stateful value to add.
* @return {@code BatchingResult.ADDED} if pair was sucessfully added,
* {@code BatchingResult.SKIPPED} if pair cannot be processed by this thread,
* {@code BatchingResult.NEW_BATCH} if pair require new batch (pending map) to be added.
public BatchingResult tryAddStatefulValue(
Map<K, StatefulValue<K, V>> pending,
StoreOperation prevOperation,
K key,
StatefulValue<K, V> val
) {
ValueStatus status = val.status();
assert !(pending.isEmpty() && prevOperation != null) : "prev operation cannot be " + prevOperation
+ " if prev map is empty!";
if (acquired(status))
// Another thread is helping us, continue to the next entry.
return BatchingResult.SKIPPED;
if (!writeCoalescing && pending.containsKey(key))
return BatchingResult.NEW_BATCH;
if (status == ValueStatus.RETRY)
assert retryEntriesCnt.get() >= 0;
if (pending.size() == batchSize)
return BatchingResult.NEW_BATCH;
// We scan for the next operation and apply batch on operation change. Null means new batch.
if (prevOperation != val.operation() && prevOperation != null)
// Operation is changed, so we need to perform a batch.
return BatchingResult.NEW_BATCH;
else {
pending.put(key, val);
return BatchingResult.ADDED;
* For test purposes only.
* @return Write cache for the underlying store operations.
Map<K, StatefulValue<K, V>> writeCache() {
return writeCache;
* For test purposes only.
* @return Flusher maps for the underlying store operations.
Map<K, StatefulValue<K, V>>[] flusherMaps() {
Map<K, StatefulValue<K, V>>[] result = new Map[flushThreadCnt];
for (int i = 0; i < flushThreadCnt; i++)
result[i] = flushThreads[i].flusherWriteMap;
return result;
* Enumeration that represents possible operations on the underlying store.
private enum StoreOperation {
/** Put key-value pair to the underlying store. */
/** Remove key from the underlying store. */
* Enumeration that represents possible states of value in the map.
private enum ValueStatus {
/** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */
/** Value is captured by flusher and store operation is performed at the moment. */
* Value is captured by flusher and store operation is performed at the moment.
* New update for the key was stored and waiting for previous store operation.
/** Store operation has failed and it will be re-tried at the next flush. */
/** Store operation succeeded and this value will be removed by flusher. */
* Enumeration that represents possible result of "add to batch" operation.
private enum BatchingResult {
/** Added to batch */
/** Skipped. */
/** Need new batch. */
* Checks if given status indicates pending or complete flush operation.
* @param status Status to check.
* @return {@code true} if status indicates any pending or complete store update operation.
private boolean acquired(ValueStatus status) {
return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED || status == ValueStatus.PENDING_AND_UPDATED;
* A state-value-operation trio.
* @param <K> Key type.
* @param <V> Value type.
private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
/** */
private static final long serialVersionUID = 0L;
/** Value. */
@GridToStringInclude(sensitive = true)
private Entry<? extends K, ? extends V> val;
/** Next value that waiting for a previous store operation. */
@GridToStringInclude(sensitive = true)
private Entry<? extends K, ? extends V> nextVal;
/** Store operation. */
private StoreOperation storeOperation;
/** Next store operation. */
private StoreOperation nextStoreOperation;
/** Value status. */
private ValueStatus valStatus;
/** Condition to wait for flush event */
private Condition flushCond = writeLock().newCondition();
* Creates a state-value pair with {@link ValueStatus#NEW} status.
* @param val Value.
* @param storeOperation Store operation.
private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV;
this.val = val;
this.storeOperation = storeOperation;
valStatus = ValueStatus.NEW;
* @return Next stored value.
private Entry<? extends K, ? extends V> nextEntry() {
return nextVal;
* @return Store operation.
private StoreOperation nextOperation() {
return nextStoreOperation;
* @return Stored value.
private Entry<? extends K, ? extends V> entry() {
return val;
* @return Store operation.
private StoreOperation operation() {
return storeOperation;
* @return Value status.
private ValueStatus status() {
return valStatus;
* Updates value status.
* @param valStatus Value status.
private void status(ValueStatus valStatus) {
this.valStatus = valStatus;
* Updates both value and value status.
* @param val Value.
* @param storeOperation Store operation.
* @param valStatus Value status.
private void update(@Nullable Entry<? extends K, ? extends V> val,
StoreOperation storeOperation,
ValueStatus valStatus) {
this.val = val;
this.storeOperation = storeOperation;
this.valStatus = valStatus;
* Added next value that waiting for a previous store operation.
* @param val Value.
* @param storeOperation Store operation.
private void setNext(@Nullable Entry<? extends K, ? extends V> val,
StoreOperation storeOperation) {
this.nextVal = val;
this.nextStoreOperation = storeOperation;
* Awaits a signal on flush condition.
* @throws IgniteInterruptedCheckedException If thread was interrupted.
private void waitForFlush() throws IgniteInterruptedCheckedException {
* Signals flush condition.
private void signalFlushed() {
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof StatefulValue))
return false;
StatefulValue other = (StatefulValue)o;
return F.eq(val, other.val) && F.eq(valStatus, other.valStatus);
/** {@inheritDoc} */
@Override public int hashCode() {
int res = val != null ? val.hashCode() : 0;
res = 31 * res + valStatus.hashCode();
return res;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StatefulValue.class, this);