blob: e1d9137c0e7717d7b2c922a2ae4a6b968f8c6953 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
/**
* Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being
* written to page store.
*/
public class DelayedPageReplacementTracker {
/** Page size. */
private final int pageSize;
/** Flush dirty page real implementation. */
private final PageStoreWriter flushDirtyPage;
/** Logger. */
private final IgniteLogger log;
/** Lock stripes for pages read protection. */
private final Stripe[] stripes;
/** Byte buffer thread local. */
private final ThreadLocal<ByteBuffer> byteBufThreadLoc
= new ThreadLocal<ByteBuffer>() {
@Override protected ByteBuffer initialValue() {
ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
buf.order(ByteOrder.LITTLE_ENDIAN);
return buf;
}
};
/**
* Dirty page write for replacement operations thread local. Because page write {@link PageStoreWriter} is
* stateful and not thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used
* instead of build-in thread local to allow GC to remove delayed writers for alive threads after node stop.
*/
private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
/**
* @param pageSize Page size.
* @param flushDirtyPage Flush dirty page.
* @param log Logger.
* @param segmentCnt Segments count.
*/
public DelayedPageReplacementTracker(
int pageSize,
PageStoreWriter flushDirtyPage,
IgniteLogger log,
int segmentCnt
) {
this.pageSize = pageSize;
this.flushDirtyPage = flushDirtyPage;
this.log = log;
stripes = new Stripe[segmentCnt];
for (int i = 0; i < stripes.length; i++)
stripes[i] = new Stripe();
}
/**
* @return delayed page write implementation, finish method to be called to actually write page.
*/
public DelayedDirtyPageStoreWrite delayedPageWrite() {
return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
}
/**
* @param id Full page ID
* @return stripe related to current page identifier.
*/
private Stripe stripe(FullPageId id) {
int segmentIdx = PageMemoryImpl.segmentIndex(id.groupId(), id.pageId(), stripes.length);
return stripes[segmentIdx];
}
/**
* @param id full page ID to lock from read
*/
public void lock(FullPageId id) {
stripe(id).lock(id);
}
/**
* Method is returned when page is available to be loaded from store, or waits for replacement finish.
*
* @param id full page ID to be loaded from store.
*/
public void waitUnlock(FullPageId id) {
stripe(id).waitUnlock(id);
}
/**
* @param id full page ID, which write has been finished, it is available for reading.
*/
public void unlock(FullPageId id) {
stripe(id).unlock(id);
}
/**
* Stripe for locking pages from reading from store in parallel with not finished write.
*/
private class Stripe {
/**
* Page IDs which are locked for reading from store. Page content is being written right now. guarded by
* collection object monitor.
*/
private final Collection<FullPageId> locked = new HashSet<>(Runtime.getRuntime().availableProcessors() * 2);
/**
* Has locked pages, flag for fast check if there are some pages, what were replaced and is being written. Write
* to field is guarded by {@link #locked} monitor.
*/
private volatile boolean hasLockedPages;
/**
* @param id full page ID to lock from read
*/
public void lock(FullPageId id) {
synchronized (locked) {
hasLockedPages = true;
boolean add = locked.add(id);
assert add : "Double locking of page for replacement is not possible";
}
}
/**
* Method is returned when page is available to be loaded from store, or waits for replacement finish.
*
* @param id full page ID to be loaded from store.
*/
public void waitUnlock(FullPageId id) {
if (!hasLockedPages)
return;
synchronized (locked) {
if (!hasLockedPages)
return;
while (locked.contains(id)) {
if (log.isDebugEnabled())
log.debug("Found replaced page [" + id + "] which is being written to page store, wait for finish replacement");
try {
locked.wait();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
}
}
/**
* @param id full page ID, which write has been finished, it is available for reading.
*/
public void unlock(FullPageId id) {
synchronized (locked) {
boolean rmv = locked.remove(id);
assert rmv : "Unlocking page ID never locked, id " + id;
if (locked.isEmpty())
hasLockedPages = false;
locked.notifyAll();
}
}
}
}