blob: 82ff3aaabac22652de168f20f09a907612538264 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.store;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static javax.cache.Cache.Entry;
/**
* Internal wrapper for a {@link CacheStore} that enables write-behind logic.
* <p/>
* The general purpose of this approach is to reduce cache store load under high
* store update rate. The idea is to cache all write and remove operations in a pending
* map and delegate these changes to the underlying store either after timeout or
* if size of a pending map exceeded some pre-configured value. Another performance gain
* is achieved due to combining a group of similar operations to a single batch update.
* <p/>
* The essential flush size for the write-behind cache should be at least the estimated
* count of simultaneously written keys. In case of significantly smaller value there would
* be triggered a lot of flush events that will result in a high cache store load.
* <p/>
* Since write operations to the cache store are deferred, transaction support is lost; no
* transaction objects are passed to the underlying store.
* <p/>
* {@link GridCacheWriteBehindStore} doesn't support concurrent modifications of the same key.
*/
public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware {
/** Default write cache initial capacity. */
public static final int DFLT_INITIAL_CAPACITY = 1024;
/** Overflow ratio for critical cache size calculation. */
public static final float CACHE_OVERFLOW_RATIO = 1.5f;
/** Default concurrency level of write cache. */
public static final int DFLT_CONCUR_LVL = 64;
/** Write cache initial capacity. */
private int initCap = DFLT_INITIAL_CAPACITY;
/** Concurrency level for write cache access. */
private int concurLvl = DFLT_CONCUR_LVL;
/** When cache size exceeds this value eldest entry will be stored to the underlying store. */
private int cacheMaxSize = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE;
/** Critical cache size. If cache size exceeds this value, data flush performed synchronously. */
private int cacheCriticalSize;
/** Count of worker threads performing underlying store updates. */
private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT;
/** Is flush threads count power of two flag. */
private boolean flushThreadCntIsPowerOfTwo;
/** Cache flush frequency. All pending operations will be performed in not less then this value ms. */
private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY;
/** Maximum batch size for put and remove operations */
private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE;
/** Ignite instance name. */
private final String igniteInstanceName;
/** Cache name. */
private final String cacheName;
/** Underlying store. */
private final CacheStore<K, V> store;
/** Write cache. */
private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
/** Flusher threads. */
private Flusher[] flushThreads;
/** Write coalescing. */
private boolean writeCoalescing = CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING;
/** Atomic flag indicating store shutdown. */
private AtomicBoolean stopping = new AtomicBoolean(true);
/** Variable for counting total cache overflows. */
private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
/** Variable contains current number of overflow events. */
private AtomicInteger cacheOverflowCntr = new AtomicInteger();
/** Variable for counting key-value pairs that are in {@link ValueStatus#RETRY} state. */
private AtomicInteger retryEntriesCnt = new AtomicInteger();
/** Log. */
private final IgniteLogger log;
/** Store manager. */
private final CacheStoreManager storeMgr;
/** Flush lock. */
private final Lock flushLock = new ReentrantLock();
/** Condition to determine records available for flush. */
private Condition canFlush = flushLock.newCondition();
/**
* Creates a write-behind cache store for the given store.
*
* @param storeMgr Store manager.
* @param igniteInstanceName Ignite instance name.
* @param cacheName Cache name.
* @param log Grid logger.
* @param store {@code GridCacheStore} that need to be wrapped.
*/
public GridCacheWriteBehindStore(
CacheStoreManager storeMgr,
String igniteInstanceName,
String cacheName,
IgniteLogger log,
CacheStore<K, V> store) {
this.storeMgr = storeMgr;
this.igniteInstanceName = igniteInstanceName;
this.cacheName = cacheName;
this.log = log;
this.store = store;
}
/**
* Sets initial capacity for the write cache.
*
* @param initCap Initial capacity.
*/
public void setInitialCapacity(int initCap) {
this.initCap = initCap;
}
/**
* Sets concurrency level for the write cache. Concurrency level is expected count of concurrent threads
* attempting to update cache.
*
* @param concurLvl Concurrency level.
*/
public void setConcurrencyLevel(int concurLvl) {
this.concurLvl = concurLvl;
}
/**
* Sets the maximum size of the write cache. When the count of unique keys in write cache exceeds this value,
* the eldest entry in the cache is immediately scheduled for write to the underlying store.
*
* @param cacheMaxSize Max cache size.
*/
public void setFlushSize(int cacheMaxSize) {
this.cacheMaxSize = cacheMaxSize;
}
/**
* Gets the maximum size of the write-behind buffer. When the count of unique keys
* in write buffer exceeds this value, the buffer is scheduled for write to the underlying store.
* <p/>
* If this value is {@code 0}, then flush is performed only on time-elapsing basis. However,
* when this value is {@code 0}, the cache critical size is set to
* {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}.
*
* @return Buffer size that triggers flush procedure.
*/
public int getWriteBehindFlushSize() {
return cacheMaxSize;
}
/**
* Sets the number of threads that will perform store update operations.
*
* @param flushThreadCnt Count of worker threads.
*/
public void setFlushThreadCount(int flushThreadCnt) {
this.flushThreadCnt = flushThreadCnt;
this.flushThreadCntIsPowerOfTwo = (flushThreadCnt & (flushThreadCnt - 1)) == 0;
}
/**
* Gets the number of flush threads that will perform store update operations.
*
* @return Count of worker threads.
*/
public int getWriteBehindFlushThreadCount() {
return flushThreadCnt;
}
/**
* Sets the write coalescing flag.
*
* @param writeCoalescing Write coalescing flag.
*/
public void setWriteCoalescing(boolean writeCoalescing) {
this.writeCoalescing = writeCoalescing;
}
/**
* Gets the write coalescing flag.
*
* @return Write coalescing flag.
*/
public boolean getWriteCoalescing() {
return writeCoalescing;
}
/**
* Sets the cache flush frequency. All pending operations on the underlying store will be performed
* within time interval not less then this value.
*
* @param cacheFlushFreq Time interval value in milliseconds.
*/
public void setFlushFrequency(long cacheFlushFreq) {
this.cacheFlushFreq = cacheFlushFreq;
}
/**
* Gets the cache flush frequency. All pending operations on the underlying store will be performed
* within time interval not less then this value.
* <p/>
* If this value is {@code 0}, then flush is performed only when buffer size exceeds flush size.
*
* @return Flush frequency in milliseconds.
*/
public long getWriteBehindFlushFrequency() {
return cacheFlushFreq;
}
/**
* Sets the maximum count of similar operations that can be grouped to a single batch.
*
* @param batchSize Maximum count of batch.
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* Gets the maximum count of similar (put or remove) operations that can be grouped to a single batch.
*
* @return Maximum size of batch.
*/
public int getWriteBehindStoreBatchSize() {
return batchSize;
}
/**
* Gets count of entries that were processed by the write-behind store and have not been
* flushed to the underlying store yet.
*
* @return Total count of entries in cache store internal buffer.
*/
public int getWriteBehindBufferSize() {
if (writeCoalescing)
return writeCache.sizex();
else {
int size = 0;
for (Flusher f : flushThreads)
size += f.size();
return size;
}
}
/**
* @return Underlying store.
*/
public CacheStore<K, V> store() {
return store;
}
/**
* Performs all the initialization logic for write-behind cache store.
* This class must not be used until this method returns.
*/
@Override public void start() {
assert cacheFlushFreq != 0 || cacheMaxSize != 0;
if (stopping.compareAndSet(true, false)) {
if (log.isDebugEnabled())
log.debug("Starting write-behind store for cache '" + cacheName + '\'');
cacheCriticalSize = (int)(cacheMaxSize * CACHE_OVERFLOW_RATIO);
if (cacheCriticalSize == 0)
cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE;
flushThreads = new GridCacheWriteBehindStore.Flusher[flushThreadCnt];
if (writeCoalescing)
writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl);
for (int i = 0; i < flushThreads.length; i++) {
flushThreads[i] = new Flusher(igniteInstanceName, "flusher-" + i, log);
flushThreads[i].start();
}
}
}
/**
* Gets count of write buffer overflow events since initialization. Each overflow event causes
* the ongoing flush operation to be performed synchronously.
*
* @return Count of cache overflow events since start.
*/
public int getWriteBehindTotalCriticalOverflowCount() {
return cacheTotalOverflowCntr.get();
}
/**
* Gets count of write buffer overflow events in progress at the moment. Each overflow event causes
* the ongoing flush operation to be performed synchronously.
*
* @return Count of cache overflow events since start.
*/
public int getWriteBehindCriticalOverflowCount() {
return cacheOverflowCntr.get();
}
/**
* Gets count of cache entries that are in a store-retry state. An entry is assigned a store-retry state
* when underlying store failed due some reason and cache has enough space to retain this entry till
* the next try.
*
* @return Count of entries in store-retry state.
*/
public int getWriteBehindErrorRetryCount() {
return retryEntriesCnt.get();
}
/**
* Performs shutdown logic for store. No put, get and remove requests will be processed after
* this method is called.
*/
@Override public void stop() {
if (stopping.compareAndSet(false, true)) {
if (log.isDebugEnabled())
log.debug("Stopping write-behind store for cache '" + cacheName + '\'');
for (Flusher f : flushThreads) {
if (!f.isEmpty())
f.wakeUp();
}
boolean graceful = true;
for (GridWorker worker : flushThreads)
graceful &= U.join(worker, log);
if (!graceful)
log.warning("Write behind store shutdown was aborted.");
}
}
/**
* Forces all entries collected to be flushed to the underlying store.
* @throws IgniteCheckedException If failed.
*/
public void forceFlush() throws IgniteCheckedException {
for (Flusher f : flushThreads) {
if (!f.isEmpty())
f.wakeUp();
}
}
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
store.loadCache(clo, args);
}
/** {@inheritDoc} */
@Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
if (log.isDebugEnabled())
log.debug(S.toString("Store load all",
"keys", keys, true));
Map<K, V> loaded = new HashMap<>();
Collection<K> remaining = null;
for (K key : keys) {
StatefulValue<K, V> val;
if (writeCoalescing)
val = writeCache.get(key);
else
val = flusher(key).flusherWriteMap.get(key);
if (val != null) {
val.readLock().lock();
try {
if (val.operation() == StoreOperation.PUT)
loaded.put(key, val.entry().getValue());
else
assert val.operation() == StoreOperation.RMV : val.operation();
}
finally {
val.readLock().unlock();
}
}
else {
if (remaining == null)
remaining = new ArrayList<>();
remaining.add(key);
}
}
// For items that were not found in queue.
if (remaining != null && !remaining.isEmpty()) {
Map<K, V> loaded0 = store.loadAll(remaining);
if (loaded0 != null)
loaded.putAll(loaded0);
}
return loaded;
}
/** {@inheritDoc} */
@Override public V load(K key) {
if (log.isDebugEnabled())
log.debug(S.toString("Store load",
"key", key, true));
StatefulValue<K, V> val;
if (writeCoalescing)
val = writeCache.get(key);
else
val = flusher(key).flusherWriteMap.get(key);
if (val != null) {
val.readLock().lock();
try {
switch (val.operation()) {
case PUT:
return val.entry().getValue();
case RMV:
return null;
default:
assert false : "Unexpected operation: " + val.status();
}
}
finally {
val.readLock().unlock();
}
}
return store.load(key);
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Entry<? extends K, ? extends V>> entries) {
for (Entry<? extends K, ? extends V> e : entries)
write(e);
}
/** {@inheritDoc} */
@Override public void write(Entry<? extends K, ? extends V> entry) {
try {
if (log.isDebugEnabled())
log.debug(S.toString("Store put",
"key", entry.getKey(), true,
"val", entry.getValue(), true));
updateCache(entry.getKey(), entry, StoreOperation.PUT);
}
catch (IgniteInterruptedCheckedException e) {
throw new CacheWriterException(U.convertExceptionNoWrap(e));
}
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) {
for (Object key : keys)
delete(key);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void delete(Object key) {
try {
if (log.isDebugEnabled())
log.debug(S.toString("Store remove",
"key", key, true));
updateCache((K)key, null, StoreOperation.RMV);
}
catch (IgniteInterruptedCheckedException e) {
throw new CacheWriterException(U.convertExceptionNoWrap(e));
}
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
// No-op.
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheWriteBehindStore.class, this);
}
/**
* Performs flush-consistent cache update for the given key.
*
* @param key Key for which update is performed.
* @param val New value, may be null for remove operation.
* @param operation Updated value status.
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
*/
private void updateCache(K key,
@Nullable Entry<? extends K, ? extends V> val,
StoreOperation operation)
throws IgniteInterruptedCheckedException {
StatefulValue<K, V> newVal = new StatefulValue<>(val, operation);
if (writeCoalescing)
putToWriteCache(key, newVal);
else
flusher(key).putToFlusherWriteCache(key, newVal);
}
/**
* Performs flush-consistent writeCache update for the given key.
*
* @param key Key for which update is performed.
* @param newVal stateful value to put
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
*/
private void putToWriteCache(
K key,
StatefulValue<K, V> newVal)
throws IgniteInterruptedCheckedException {
StatefulValue<K, V> prev;
assert writeCoalescing : "Unexpected write coalescing.";
while ((prev = writeCache.putIfAbsent(key, newVal)) != null) {
prev.writeLock().lock();
try {
if (prev.status() == ValueStatus.PENDING) {
// Flush process in progress, try again.
prev.waitForFlush();
continue;
}
else if (prev.status() == ValueStatus.FLUSHED)
// This entry was deleted from map before we acquired the lock.
continue;
else if (prev.status() == ValueStatus.RETRY)
// New value has come, old value is no longer in RETRY state,
retryEntriesCnt.decrementAndGet();
assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY;
prev.update(newVal.val, newVal.operation(), ValueStatus.NEW);
break;
}
finally {
prev.writeLock().unlock();
}
}
// Now check the map size
int cacheSize = getWriteBehindBufferSize();
if (cacheSize > cacheCriticalSize)
// Perform single store update in the same thread.
flushSingleValue();
else if (cacheMaxSize > 0 && cacheSize > cacheMaxSize)
wakeUp();
}
/**
* Return flusher by by key.
*
* @param key Key for search.
* @return flusher.
*/
private Flusher flusher(K key) {
int h, idx;
if (flushThreadCntIsPowerOfTwo)
idx = ((h = key.hashCode()) ^ (h >>> 16)) & (flushThreadCnt - 1);
else
idx = ((h = key.hashCode()) ^ (h >>> 16)) % flushThreadCnt;
return flushThreads[idx];
}
/**
* Flushes one upcoming value to the underlying store. Called from
* {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds
* critical size.
*/
private void flushSingleValue() {
cacheOverflowCntr.incrementAndGet();
try {
Map<K, StatefulValue<K, V>> batch;
for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) {
StatefulValue<K, V> val = e.getValue();
val.writeLock().lock();
try {
ValueStatus status = val.status();
if (acquired(status))
// Another thread is helping us, continue to the next entry.
continue;
if (val.status() == ValueStatus.RETRY)
retryEntriesCnt.decrementAndGet();
assert retryEntriesCnt.get() >= 0;
val.status(ValueStatus.PENDING);
batch = Collections.singletonMap(e.getKey(), val);
}
finally {
val.writeLock().unlock();
}
if (!batch.isEmpty()) {
applyBatch(batch, false, null);
cacheTotalOverflowCntr.incrementAndGet();
return;
}
}
}
finally {
cacheOverflowCntr.decrementAndGet();
}
}
/**
* Performs batch operation on underlying store.
*
* @param valMap Batch map.
* @param initSes {@code True} if need to initialize session.
* @param flusher Flusher, assotiated with all keys in batch (have sense in write coalescing = false mode)
* @return {@code True} if batch was successfully applied, {@code False} otherwise.
*/
private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes, Flusher flusher) {
assert valMap.size() <= batchSize;
assert !valMap.isEmpty();
StoreOperation operation = null;
// Construct a map for underlying store
Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
if (operation == null)
operation = e.getValue().operation();
assert operation == e.getValue().operation();
assert e.getValue().status() == ValueStatus.PENDING;
batch.put(e.getKey(), e.getValue().entry());
}
boolean result = updateStore(operation, batch, initSes, flusher);
if (result) {
for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
StatefulValue<K, V> val = e.getValue();
val.writeLock().lock();
try {
val.status(ValueStatus.FLUSHED);
if (writeCoalescing) {
StatefulValue<K, V> prev = writeCache.remove(e.getKey());
// Additional check to ensure consistency.
assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
val.signalFlushed();
}
else {
Flusher f = flusher(e.getKey());
// Can remove using equal because if map contains another similar value it has different state.
f.flusherWriteMap.remove(e.getKey(), e.getValue());
val.signalFlushed();
}
}
finally {
val.writeLock().unlock();
}
}
}
else {
// Exception occurred, we must set RETRY status
for (StatefulValue<K, V> val : valMap.values()) {
val.writeLock().lock();
try {
val.status(ValueStatus.RETRY);
retryEntriesCnt.incrementAndGet();
val.signalFlushed();
}
finally {
val.writeLock().unlock();
}
}
}
return result;
}
/**
* Tries to update store with the given values and returns {@code true} in case of success.
*
* <p/> If any exception in underlying store is occurred, this method checks the map size.
* If map size exceeds some critical value, then it returns {@code true} and this value will
* be lost. If map size does not exceed critical value, it will return false and value will
* be retained in write cache.
*
* @param operation Status indicating operation that should be performed.
* @param vals Key-Value map.
* @param initSes {@code True} if need to initialize session.
* @param flusher Flusher, assotiated with vals keys (in writeCoalescing=false mode)
* @return {@code true} if value may be deleted from the write cache,
* {@code false} otherwise
*/
private boolean updateStore(
StoreOperation operation,
Map<K, Entry<? extends K, ? extends V>> vals,
boolean initSes,
Flusher flusher
) {
try {
if (storeMgr != null) {
if (initSes)
storeMgr.writeBehindSessionInit();
else
// Back-pressure mechanism is running.
// Cache store session must be initialized by storeMgr.
storeMgr.writeBehindCacheStoreSessionListenerStart();
}
boolean threwEx = true;
try {
switch (operation) {
case PUT:
store.writeAll(vals.values());
break;
case RMV:
store.deleteAll(vals.keySet());
break;
default:
assert false : "Unexpected operation: " + operation;
}
threwEx = false;
return true;
}
finally {
if (initSes && storeMgr != null)
storeMgr.writeBehindSessionEnd(threwEx);
}
}
catch (Exception e) {
LT.error(log, e, "Unable to update underlying store: " + store);
boolean overflow;
if (writeCoalescing)
overflow = writeCache.sizex() > cacheCriticalSize || stopping.get();
else
overflow = flusher.isOverflowed() || stopping.get();
if (overflow) {
for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) {
Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
log.warning("Failed to update store (value will be lost as current buffer size is greater " +
"than 'cacheCriticalSize' or node has been stopped before store was repaired) [key=" +
entry.getKey() + ", val=" + val + ", op=" + operation + "]");
}
return true;
}
return false;
}
}
/**
* Wakes up flushing threads if map size exceeded maximum value or in case of shutdown.
*/
private void wakeUp() {
flushLock.lock();
try {
canFlush.signalAll();
}
finally {
flushLock.unlock();
}
}
/**
* Thread that performs time/size-based flushing of written values to the underlying storage.
*/
private class Flusher extends GridWorker {
/** Queue to flush. */
private final FastSizeDeque<IgniteBiTuple<K, StatefulValue<K,V>>> queue;
/** Flusher write map. */
private final ConcurrentHashMap<K, StatefulValue<K,V>> flusherWriteMap;
/** Critical size of flusher local queue. */
private final int flusherCacheCriticalSize;
/** Flusher parked flag. */
private volatile boolean parked;
/** Flusher thread. */
protected Thread thread;
/** Cache flushing frequence in nanos. */
protected long cacheFlushFreqNanos = cacheFlushFreq * 1000 * 1000;
/** Writer lock. */
private final Lock flusherWriterLock = new ReentrantLock();
/** Confition to determine available space for flush. */
private Condition flusherWriterCanWrite = flusherWriterLock.newCondition();
/** {@inheritDoc */
protected Flusher(String igniteInstanceName,
String name,
IgniteLogger log) {
super(igniteInstanceName, name, log);
flusherCacheCriticalSize = cacheCriticalSize/flushThreadCnt;
assert flusherCacheCriticalSize > batchSize;
if (writeCoalescing) {
queue = null;
flusherWriteMap = null;
}
else {
queue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl);
}
}
/** Start flusher thread */
protected void start() {
thread = new IgniteThread(this);
thread.start();
}
/**
* Performs flush-consistent flusher writeCache update for the given key.
*
* @param key Key for which update is performed.
* @param newVal stateful value to put
* @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed.
*/
private void putToFlusherWriteCache(
K key,
StatefulValue<K, V> newVal
) throws IgniteInterruptedCheckedException {
assert !writeCoalescing : "Unexpected write coalescing.";
if (queue.sizex() > flusherCacheCriticalSize) {
while (queue.sizex() > flusherCacheCriticalSize) {
wakeUp();
flusherWriterLock.lock();
try {
// Wait for free space in flusher queue
while (queue.sizex() >= flusherCacheCriticalSize && !stopping.get()) {
if (cacheFlushFreq > 0)
flusherWriterCanWrite.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
else
flusherWriterCanWrite.await();
}
cacheTotalOverflowCntr.incrementAndGet();
}
catch (InterruptedException e) {
if (log.isDebugEnabled())
log.debug("Caught interrupted exception: " + e);
Thread.currentThread().interrupt();
}
finally {
flusherWriterLock.unlock();
}
}
cacheTotalOverflowCntr.incrementAndGet();
}
queue.add(F.t(key, newVal));
flusherWriteMap.put(key, newVal);
}
/**
* Get overflowed flag.
*
* @return {@code True} if write behind flusher is overflowed,
* {@code False} otherwise.
*/
public boolean isOverflowed() {
if (writeCoalescing)
return writeCache.sizex() > cacheCriticalSize;
else
return queue.sizex() > flusherCacheCriticalSize;
}
/**
* Get write behind flusher size.
*
* @return Flusher write behind size.
*/
public int size() {
return writeCoalescing ? writeCache.sizex() : queue.sizex();
}
/**
* Test if write behind flusher is empty
*
* @return {@code True} if write behind flusher is empty, {@code False} otherwise
*/
public boolean isEmpty() {
return writeCoalescing ? writeCache.isEmpty() : queue.isEmpty();
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
if (writeCoalescing) {
while (!stopping.get() || writeCache.sizex() > 0) {
awaitOperationsAvailableCoalescing();
flushCacheCoalescing();
}
}
else {
while (!stopping.get() || queue.sizex() > 0) {
awaitOperationsAvailableNonCoalescing();
flushCacheNonCoalescing();
}
}
}
/**
* This method awaits until enough elements in flusher queue are available or given timeout is over.
*
* @throws InterruptedException If awaiting was interrupted.
*/
private void awaitOperationsAvailableCoalescing() throws InterruptedException {
flushLock.lock();
try {
do {
if (writeCache.sizex() <= cacheMaxSize || cacheMaxSize == 0) {
if (cacheFlushFreq > 0)
canFlush.await(cacheFlushFreq, TimeUnit.MILLISECONDS);
else
canFlush.await();
}
}
while (writeCache.sizex() == 0 && !stopping.get());
}
finally {
flushLock.unlock();
}
}
/**
* This method awaits until enough elements in flusher queue are available or given timeout is over.
*
* @throws InterruptedException If awaiting was interrupted.
*/
private void awaitOperationsAvailableNonCoalescing() throws InterruptedException {
if (queue.sizex() >= batchSize)
return;
parked = true;
try {
for (;;) {
if (queue.sizex() >= batchSize)
return;
if (cacheFlushFreq > 0)
LockSupport.parkNanos(cacheFlushFreqNanos);
else
LockSupport.park();
if (queue.sizex() > 0)
return;
if (Thread.interrupted())
throw new InterruptedException();
if (stopping.get())
return;
}
}
finally {
parked = false;
}
}
/**
* Wake up flusher thread.
*/
public void wakeUp() {
if (parked)
LockSupport.unpark(thread);
}
/**
* Removes values from the write cache and performs corresponding operation
* on the underlying store.
*/
private void flushCacheCoalescing() {
StoreOperation prevOperation = null;
Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize);
Iterator<Map.Entry<K, StatefulValue<K, V>>> it = writeCache.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<K, StatefulValue<K, V>> e = it.next();
StatefulValue<K, V> val = e.getValue();
if (!val.writeLock().tryLock()) // TODO: stripe write maps to avoid lock contention.
continue;
try {
BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, e.getKey(), val);
switch (addRes) {
case NEW_BATCH:
applyBatch(pending, true, null);
pending = U.newLinkedHashMap(batchSize);
// No need to test first value in batch
val.status(ValueStatus.PENDING);
pending.put(e.getKey(), val);
prevOperation = val.operation();
break;
case ADDED:
prevOperation = val.operation();
break;
default:
assert addRes == BatchingResult.SKIPPED : "Unexpected result: " + addRes;
}
}
finally {
val.writeLock().unlock();
}
}
// Process the remainder.
if (!pending.isEmpty())
applyBatch(pending, true, null);
}
/**
* Removes values from the flusher write queue and performs corresponding operation
* on the underlying store.
*/
private void flushCacheNonCoalescing() {
StoreOperation prevOperation;
Map<K, StatefulValue<K, V>> pending;
IgniteBiTuple<K, StatefulValue<K, V>> tuple;
boolean applied;
while(!queue.isEmpty()) {
pending = U.newLinkedHashMap(batchSize);
prevOperation = null;
boolean needNewBatch = false;
// Collect batch
while (!needNewBatch && (tuple = queue.peek()) != null) {
BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, tuple.getKey(),
tuple.getValue());
switch (addRes) {
case ADDED:
prevOperation = tuple.getValue().operation();
queue.poll();
break;
case SKIPPED:
assert false : "Unexpected result: " + addRes;
break;
case NEW_BATCH:
needNewBatch = true;
prevOperation = null;
break;
default:
assert false : "Unexpected result: " + addRes;
}
}
// Process collected batch
applied = applyBatch(pending, true, this);
if (applied) {
// Wake up awaiting writers
flusherWriterLock.lock();
try {
flusherWriterCanWrite.signalAll();
}
finally {
flusherWriterLock.unlock();
}
}
else {
// Return values to queue
ArrayList<Map.Entry<K, StatefulValue<K,V>>> pendingList = new ArrayList(pending.entrySet());
for (int i = pendingList.size() - 1; i >= 0; i--)
queue.addFirst(F.t(pendingList.get(i).getKey(), pendingList.get(i).getValue()));
}
}
}
/**
* Trying to add key and statefull value pairs into pending map.
*
* @param pending Map to populate.
* @param key Key to add.
* @param val Stateful value to add.
* @return {@code BatchingResult.ADDED} if pair was sucessfully added,
* {@code BatchingResult.SKIPPED} if pair cannot be processed by this thread,
* {@code BatchingResult.NEW_BATCH} if pair require new batch (pending map) to be added.
*/
public BatchingResult tryAddStatefulValue(
Map<K, StatefulValue<K, V>> pending,
StoreOperation prevOperation,
K key,
StatefulValue<K, V> val
) {
ValueStatus status = val.status();
assert !(pending.isEmpty() && prevOperation != null) : "prev operation cannot be " + prevOperation
+ " if prev map is empty!";
if (acquired(status))
// Another thread is helping us, continue to the next entry.
return BatchingResult.SKIPPED;
if (!writeCoalescing && pending.containsKey(key))
return BatchingResult.NEW_BATCH;
if (status == ValueStatus.RETRY)
retryEntriesCnt.decrementAndGet();
assert retryEntriesCnt.get() >= 0;
if (pending.size() == batchSize)
return BatchingResult.NEW_BATCH;
// We scan for the next operation and apply batch on operation change. Null means new batch.
if (prevOperation != val.operation() && prevOperation != null)
// Operation is changed, so we need to perform a batch.
return BatchingResult.NEW_BATCH;
else {
val.status(ValueStatus.PENDING);
pending.put(key, val);
return BatchingResult.ADDED;
}
}
}
/**
* For test purposes only.
*
* @return Write cache for the underlying store operations.
*/
Map<K, StatefulValue<K, V>> writeCache() {
return writeCache;
}
/**
* For test purposes only.
*
* @return Flusher maps for the underlying store operations.
*/
Map<K, StatefulValue<K,V>>[] flusherMaps() {
Map<K, StatefulValue<K,V>>[] result = new Map[flushThreadCnt];
for (int i=0; i < flushThreadCnt; i++)
result[i] = flushThreads[i].flusherWriteMap;
return result;
}
/**
* Enumeration that represents possible operations on the underlying store.
*/
private enum StoreOperation {
/** Put key-value pair to the underlying store. */
PUT,
/** Remove key from the underlying store. */
RMV
}
/**
* Enumeration that represents possible states of value in the map.
*/
private enum ValueStatus {
/** Value is scheduled for write or delete from the underlying cache but has not been captured by flusher. */
NEW,
/** Value is captured by flusher and store operation is performed at the moment. */
PENDING,
/** Store operation has failed and it will be re-tried at the next flush. */
RETRY,
/** Store operation succeeded and this value will be removed by flusher. */
FLUSHED,
}
/**
* Enumeration that represents possible result of "add to batch" operation.
*/
private enum BatchingResult {
/** Added to batch */
ADDED,
/** Skipped. */
SKIPPED,
/** Need new batch. */
NEW_BATCH
}
/**
* Checks if given status indicates pending or complete flush operation.
*
* @param status Status to check.
* @return {@code true} if status indicates any pending or complete store update operation.
*/
private boolean acquired(ValueStatus status) {
return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
}
/**
* A state-value-operation trio.
*
* @param <K> Key type.
* @param <V> Value type.
*/
private static class StatefulValue<K, V> extends ReentrantReadWriteLock {
/** */
private static final long serialVersionUID = 0L;
/** Value. */
@GridToStringInclude(sensitive = true)
private Entry<? extends K, ? extends V> val;
/** Store operation. */
private StoreOperation storeOperation;
/** Value status. */
private ValueStatus valStatus;
/** Condition to wait for flush event */
private Condition flushCond = writeLock().newCondition();
/**
* Creates a state-value pair with {@link ValueStatus#NEW} status.
*
* @param val Value.
* @param storeOperation Store operation.
*/
private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
assert storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV;
this.val = val;
this.storeOperation = storeOperation;
valStatus = ValueStatus.NEW;
}
/**
* @return Stored value.
*/
private Entry<? extends K, ? extends V> entry() {
return val;
}
/**
* @return Store operation.
*/
private StoreOperation operation() {
return storeOperation;
}
/**
* @return Value status.
*/
private ValueStatus status() {
return valStatus;
}
/**
* Updates value status.
*
* @param valStatus Value status.
*/
private void status(ValueStatus valStatus) {
this.valStatus = valStatus;
}
/**
* Updates both value and value status.
*
* @param val Value.
* @param storeOperation Store operation.
* @param valStatus Value status.
*/
private void update(@Nullable Entry<? extends K, ? extends V> val,
StoreOperation storeOperation,
ValueStatus valStatus) {
this.val = val;
this.storeOperation = storeOperation;
this.valStatus = valStatus;
}
/**
* Awaits a signal on flush condition.
*
* @throws IgniteInterruptedCheckedException If thread was interrupted.
*/
private void waitForFlush() throws IgniteInterruptedCheckedException {
U.await(flushCond);
}
/**
* Signals flush condition.
*/
@SuppressWarnings({"SignalWithoutCorrespondingAwait"})
private void signalFlushed() {
flushCond.signalAll();
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof StatefulValue))
return false;
StatefulValue other = (StatefulValue)o;
return F.eq(val, other.val) && F.eq(valStatus, other.valStatus);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = val != null ? val.hashCode() : 0;
res = 31 * res + valStatus.hashCode();
return res;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StatefulValue.class, this);
}
}
}