blob: a4d082d4d690e142eda54a0d5dea4102be5c29c2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CoalescingCommiter implements Committer
{
private final CommitThread _commitThread;
public CoalescingCommiter(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade)
{
_commitThread = new CommitThread("Commit-Thread-" + name, commiterNotifyThreshold, commiterWaitTimeout, environmentFacade);
}
@Override
public void start()
{
_commitThread.start();
}
@Override
public void stop()
{
_commitThread.close();
if (Thread.currentThread() != _commitThread)
{
try
{
_commitThread.join();
}
catch (InterruptedException ie)
{
Thread.currentThread().interrupt();
throw new RuntimeException("Commit thread has not shutdown", ie);
}
}
}
@Override
public void commit(Transaction tx, boolean syncCommit)
{
if(syncCommit)
{
SynchronousCommitThreadJob job = new SynchronousCommitThreadJob();
_commitThread.addJob(job, true);
job.awaitCompletion();
}
}
@Override
public <X> ListenableFuture<X> commitAsync(Transaction tx, X val)
{
ThreadNotifyingSettableFuture<X> future = new ThreadNotifyingSettableFuture<X>();
BDBCommitFutureResult<X> commitFuture = new BDBCommitFutureResult<X>(val, future);
_commitThread.addJob(commitFuture, false);
return future;
}
private static final class BDBCommitFutureResult<X> implements CommitThreadJob
{
private final X _value;
private final ThreadNotifyingSettableFuture<X> _future;
public BDBCommitFutureResult(X value,
final ThreadNotifyingSettableFuture<X> future)
{
_value = value;
_future = future;
}
@Override
public void complete()
{
_future.set(_value);
}
@Override
public void abort(RuntimeException databaseException)
{
_future.setException(databaseException);
}
}
private interface CommitThreadJob
{
void complete();
void abort(RuntimeException e);
}
/**
* Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
* completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
private static class CommitThread extends Thread
{
private static final Logger LOGGER = LoggerFactory.getLogger(CommitThread.class);
private final int _jobQueueNotifyThreshold;
private final long _commiterWaitTimeout;
private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final Queue<CommitThreadJob> _jobQueue = new ConcurrentLinkedQueue<>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
private final List<CommitThreadJob> _inProcessJobs = new ArrayList<>(256);
public CommitThread(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade)
{
super(name);
this._jobQueueNotifyThreshold = commiterNotifyThreshold;
this._commiterWaitTimeout = commiterWaitTimeout;
_environmentFacade = environmentFacade;
}
public void explicitNotify()
{
synchronized (_lock)
{
_lock.notifyAll();
}
}
@Override
public void run()
{
while (!_stopped.get())
{
synchronized (_lock)
{
while (!_stopped.get() && !hasJobs())
{
try
{
// Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
_lock.wait(_commiterWaitTimeout);
}
catch (InterruptedException e)
{
}
}
}
processJobs();
}
}
private void processJobs()
{
CommitThreadJob job;
while((job = _jobQueue.poll()) != null)
{
_inProcessJobs.add(job);
}
int completedJobsIndex = 0;
try
{
long startTime = 0;
if(LOGGER.isDebugEnabled())
{
startTime = System.currentTimeMillis();
}
_environmentFacade.flushLog();
if(LOGGER.isDebugEnabled())
{
long duration = System.currentTimeMillis() - startTime;
LOGGER.debug("flushLog completed in " + duration + " ms");
}
while(completedJobsIndex < _inProcessJobs.size())
{
_inProcessJobs.get(completedJobsIndex).complete();
completedJobsIndex++;
}
}
catch (RuntimeException e)
{
try
{
LOGGER.error("Exception during environment log flush", e);
for(; completedJobsIndex < _inProcessJobs.size(); completedJobsIndex++)
{
CommitThreadJob commit = _inProcessJobs.get(completedJobsIndex);
commit.abort(e);
}
}
finally
{
_environmentFacade.flushLogFailed(e);
}
}
finally
{
_inProcessJobs.clear();
}
}
private boolean hasJobs()
{
return !_jobQueue.isEmpty();
}
public void addJob(CommitThreadJob commit, final boolean sync)
{
if (_stopped.get())
{
throw new IllegalStateException("Commit thread is stopped");
}
_jobQueue.add(commit);
if(sync || _jobQueue.size() >= _jobQueueNotifyThreshold)
{
synchronized (_lock)
{
_lock.notifyAll();
}
}
}
public void close()
{
synchronized (_lock)
{
_stopped.set(true);
CommitThreadJob commit;
try
{
_environmentFacade.flushLog();
while ((commit = _jobQueue.poll()) != null)
{
commit.complete();
}
}
catch(RuntimeException flushException)
{
RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
int abortedCommits = 0;
while ((commit = _jobQueue.poll()) != null)
{
abortedCommits++;
commit.abort(e);
}
if (LOGGER.isDebugEnabled() && abortedCommits > 0)
{
LOGGER.debug(abortedCommits + " commit(s) were aborted during close.");
}
}
_lock.notifyAll();
}
}
}
private final class ThreadNotifyingSettableFuture<X> extends AbstractFuture<X>
{
@Override
public X get(final long timeout, final TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException
{
if(!isDone())
{
_commitThread.explicitNotify();
}
return super.get(timeout, unit);
}
@Override
public X get() throws InterruptedException, ExecutionException
{
if(!isDone())
{
_commitThread.explicitNotify();
}
return super.get();
}
@Override
protected boolean set(final X value)
{
return super.set(value);
}
@Override
protected boolean setException(final Throwable throwable)
{
return super.setException(throwable);
}
@Override
public void addListener(final Runnable listener, final Executor exec)
{
super.addListener(listener, exec);
_commitThread.explicitNotify();
}
}
private class SynchronousCommitThreadJob implements CommitThreadJob
{
private boolean _done;
private RuntimeException _exception;
@Override
public synchronized void complete()
{
_done = true;
notifyAll();
}
@Override
public synchronized void abort(final RuntimeException e)
{
_done = true;
_exception = e;
notifyAll();
}
public synchronized void awaitCompletion()
{
boolean interrupted = false;
while(!_done)
{
try
{
wait();
}
catch (InterruptedException e)
{
interrupted = true;
}
}
if(interrupted)
{
Thread.currentThread().interrupt();
}
if(_exception != null)
{
throw _exception;
}
}
}
}