blob: eb1bffe02b09d4293b3d30afb2ebf0e2648f3eb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hints;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
/**
* A single threaded executor that exclusively writes all the hints and otherwise manipulate the writers.
*
* Flushing demultiplexes the provided {@link HintsBuffer} and sequentially writes to each {@link HintsWriter},
* using the same shared write buffer. In the near future, when CASSANDRA-9428 (compression) is implemented,
* will also share a compression buffer.
*/
final class HintsWriteExecutor
{
static final int WRITE_BUFFER_SIZE = 256 << 10;
private final HintsCatalog catalog;
private final ByteBuffer writeBuffer;
private final ExecutorService executor;
HintsWriteExecutor(HintsCatalog catalog)
{
this.catalog = catalog;
writeBuffer = ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE);
executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("HintsWriteExecutor", 1);
}
/*
* Should be very fast (worst case scenario - write a few 10s of megabytes to disk).
*/
void shutdownBlocking()
{
executor.shutdown();
try
{
executor.awaitTermination(1, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}
/**
* Flush the provided buffer, recycle it and offer it back to the pool.
*/
Future<?> flushBuffer(HintsBuffer buffer, HintsBufferPool bufferPool)
{
return executor.submit(new FlushBufferTask(buffer, bufferPool));
}
/**
* Flush the current buffer, but without clearing/recycling it.
*/
Future<?> flushBufferPool(HintsBufferPool bufferPool)
{
return executor.submit(new FlushBufferPoolTask(bufferPool));
}
/**
* Flush the current buffer just for the specified hints stores. Without clearing/recycling it.
*/
Future<?> flushBufferPool(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
{
return executor.submit(new PartiallyFlushBufferPoolTask(bufferPool, stores));
}
void fsyncWritersBlockingly(Iterable<HintsStore> stores)
{
try
{
executor.submit(new FsyncWritersTask(stores)).get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
}
Future<?> closeWriter(HintsStore store)
{
return executor.submit(store::closeWriter);
}
Future<?> closeAllWriters()
{
return executor.submit(() -> catalog.stores().forEach(HintsStore::closeWriter));
}
private final class FlushBufferTask implements Runnable
{
private final HintsBuffer buffer;
private final HintsBufferPool bufferPool;
FlushBufferTask(HintsBuffer buffer, HintsBufferPool bufferPool)
{
this.buffer = buffer;
this.bufferPool = bufferPool;
}
public void run()
{
buffer.waitForModifications();
try
{
flush(buffer);
}
finally
{
HintsBuffer recycledBuffer = buffer.recycle();
bufferPool.offer(recycledBuffer);
}
}
}
private final class FlushBufferPoolTask implements Runnable
{
private final HintsBufferPool bufferPool;
FlushBufferPoolTask(HintsBufferPool bufferPool)
{
this.bufferPool = bufferPool;
}
public void run()
{
HintsBuffer buffer = bufferPool.currentBuffer();
buffer.waitForModifications();
flush(buffer);
}
}
private final class PartiallyFlushBufferPoolTask implements Runnable
{
private final HintsBufferPool bufferPool;
private final Iterable<HintsStore> stores;
PartiallyFlushBufferPoolTask(HintsBufferPool bufferPool, Iterable<HintsStore> stores)
{
this.bufferPool = bufferPool;
this.stores = stores;
}
public void run()
{
HintsBuffer buffer = bufferPool.currentBuffer();
buffer.waitForModifications();
stores.forEach(store -> flush(buffer.consumingHintsIterator(store.hostId), store));
}
}
private final class FsyncWritersTask implements Runnable
{
private final Iterable<HintsStore> stores;
FsyncWritersTask(Iterable<HintsStore> stores)
{
this.stores = stores;
}
public void run()
{
stores.forEach(HintsStore::fsyncWriter);
catalog.fsyncDirectory();
}
}
private void flush(HintsBuffer buffer)
{
buffer.hostIds().forEach(hostId -> flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId)));
}
private void flush(Iterator<ByteBuffer> iterator, HintsStore store)
{
while (true)
{
if (iterator.hasNext())
flushInternal(iterator, store);
if (!iterator.hasNext())
break;
// exceeded the size limit for an individual file, but still have more to write
// close the current writer and continue flushing to a new one in the next iteration
store.closeWriter();
}
}
@SuppressWarnings("resource") // writer not closed here
private void flushInternal(Iterator<ByteBuffer> iterator, HintsStore store)
{
long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize();
HintsWriter writer = store.getOrOpenWriter();
try (HintsWriter.Session session = writer.newSession(writeBuffer))
{
while (iterator.hasNext())
{
session.append(iterator.next());
if (session.position() >= maxHintsFileSize)
break;
}
}
catch (IOException e)
{
throw new FSWriteError(e, writer.descriptor().fileName());
}
}
}