blob: 505a02b65609279d7256b597d0355387ec861884 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wicket.page;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.wicket.Application;
import org.apache.wicket.pageStore.IPageStore;
import org.apache.wicket.settings.ExceptionSettings.ThreadDumpStrategy;
import org.apache.wicket.util.LazyInitializer;
import org.apache.wicket.util.lang.Threads;
import org.apache.wicket.util.time.Durations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Synchronizes access to page instances from multiple threads
*
* @author Igor Vaynberg (ivaynberg)
*/
public class PageAccessSynchronizer implements Serializable
{
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(PageAccessSynchronizer.class);
/** map of which pages are owned by which threads */
private final Supplier<ConcurrentMap<Integer, PageLock>> locks = new LazyInitializer<ConcurrentMap<Integer, PageLock>>()
{
private static final long serialVersionUID = 1L;
@Override
protected ConcurrentMap<Integer, PageLock> createInstance()
{
return new ConcurrentHashMap<>();
}
};
/** timeout value for acquiring a page lock */
private final Duration timeout;
/**
* Constructor
*
* @param timeout
* timeout value for acquiring a page lock
*/
public PageAccessSynchronizer(Duration timeout)
{
this.timeout = timeout;
}
private static long remaining(Instant start, Duration timeout)
{
Duration elapsedTime = Durations.elapsedSince(start);
return Math.max(0, timeout.minus(elapsedTime).toMillis());
}
/**
* @param pageId
* the id of the page to be locked
* @return the duration for acquiring a page lock
*/
public Duration getTimeout(int pageId)
{
return timeout;
}
/**
* Acquire a lock to a page
*
* @param pageId
* page id
* @throws CouldNotLockPageException
* if lock could not be acquired
*/
public void lockPage(int pageId) throws CouldNotLockPageException
{
final Thread thread = Thread.currentThread();
final PageLock lock = new PageLock(pageId, thread);
final Instant start = Instant.now();
boolean locked = false;
final boolean isDebugEnabled = logger.isDebugEnabled();
PageLock previous = null;
Duration timeout = getTimeout(pageId);
while (!locked && Durations.elapsedSince(start).compareTo(timeout) < 0)
{
if (isDebugEnabled)
{
logger.debug("'{}' attempting to acquire lock to page with id '{}'",
thread.getName(), pageId);
}
previous = locks.get().putIfAbsent(pageId, lock);
if (previous == null || previous.thread == thread)
{
// first thread to acquire lock or lock is already owned by this thread
locked = true;
}
else
{
// wait for a lock to become available
long remaining = remaining(start, timeout);
if (remaining > 0)
{
previous.waitForRelease(remaining, isDebugEnabled);
}
}
}
if (locked)
{
if (isDebugEnabled)
{
logger.debug("{} acquired lock to page {}", thread.getName(), pageId);
}
}
else
{
if (logger.isWarnEnabled())
{
logger.warn(
"Thread '{}' failed to acquire lock to page with id '{}', attempted for {} out of allowed {}." +
" The thread that holds the lock has name '{}'.",
thread.getName(), pageId, Duration.between(start, Instant.now()), timeout,
previous.thread.getName());
if (Application.exists())
{
ThreadDumpStrategy strategy = Application.get()
.getExceptionSettings()
.getThreadDumpStrategy();
switch (strategy)
{
case ALL_THREADS :
Threads.dumpAllThreads(logger);
break;
case THREAD_HOLDING_LOCK :
Threads.dumpSingleThread(logger, previous.thread);
break;
case NO_THREADS :
default :
// do nothing
}
}
}
throw new CouldNotLockPageException(pageId, thread.getName(), timeout);
}
}
/**
* Unlocks all pages locked by this thread
*/
public void unlockAllPages()
{
internalUnlockPages(null);
}
/**
* Unlocks a single page locked by the current thread.
*
* @param pageId
* the id of the page which should be unlocked.
*/
public void unlockPage(int pageId)
{
internalUnlockPages(pageId);
}
private void internalUnlockPages(final Integer pageId)
{
final Thread thread = Thread.currentThread();
final Iterator<PageLock> locks = this.locks.get().values().iterator();
final boolean isDebugEnabled = logger.isDebugEnabled();
while (locks.hasNext())
{
// remove all locks held by this thread if 'pageId' is not specified
// otherwise just the lock for this 'pageId'
final PageLock lock = locks.next();
if ((pageId == null || pageId == lock.pageId) && lock.thread == thread)
{
locks.remove();
if (isDebugEnabled)
{
logger.debug("'{}' released lock to page with id '{}'", thread.getName(),
lock.pageId);
}
// if any locks were removed notify threads waiting for a lock
lock.markReleased(isDebugEnabled);
if (pageId != null)
{
// unlock just the page with the specified id
break;
}
}
}
}
/*
* used by tests
*/
Supplier<ConcurrentMap<Integer, PageLock>> getLocks()
{
return locks;
}
/**
* Wraps a page manager with this synchronizer
*
* @param manager
* @return wrapped page manager
*/
public IPageManager adapt(final IPageManager manager)
{
return new IPageManager()
{
@Override
public boolean supportsVersioning()
{
return manager.supportsVersioning();
}
@Override
public IManageablePage getPage(int pageId)
{
IManageablePage page = null;
try
{
lockPage(pageId);
page = manager.getPage(pageId);
}
finally
{
if (page == null)
{
unlockPage(pageId);
}
}
return page;
}
@Override
public void removePage(IManageablePage page)
{
if (page != null)
{
try
{
manager.removePage(page);
}
finally
{
unlockPage(page.getPageId());
}
}
}
@Override
public void touchPage(IManageablePage page)
{
lockPage(page.getPageId());
manager.touchPage(page);
}
@Override
public void clear()
{
manager.clear();
}
@Override
public void untouchPage(IManageablePage page)
{
manager.untouchPage(page);
}
@Override
public void detach()
{
try
{
manager.detach();
}
finally
{
unlockAllPages();
}
}
@Override
public IPageStore getPageStore()
{
return manager.getPageStore();
}
@Override
public void destroy()
{
manager.destroy();
}
};
}
/**
* Thread's lock on a page
*
* @author igor
*/
public static class PageLock
{
/** page id */
private final int pageId;
/** thread that owns the lock */
private final Thread thread;
private volatile boolean released = false;
/**
* Constructor
*
* @param pageId
* @param thread
*/
public PageLock(int pageId, Thread thread)
{
this.pageId = pageId;
this.thread = thread;
}
/**
* @return page id of locked page
*/
public int getPageId()
{
return pageId;
}
/**
* @return thread that owns the lock
*/
public Thread getThread()
{
return thread;
}
final synchronized void waitForRelease(long remaining, boolean isDebugEnabled)
{
if (released)
{
// the thread holding the lock released it before we were able to wait for the
// release
if (isDebugEnabled)
{
logger.debug(
"lock for page with id {} no longer locked by {}, falling through", pageId,
thread.getName());
}
return;
}
if (isDebugEnabled)
{
logger.debug("{} waiting for lock to page {} for {}",
thread.getName(), pageId, Duration.ofMillis(remaining));
}
try
{
wait(remaining);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
final synchronized void markReleased(boolean isDebugEnabled)
{
if (isDebugEnabled)
{
logger.debug("'{}' notifying blocked threads", thread.getName());
}
released = true;
notifyAll();
}
}
}