blob: 659d34dfe23511e2767021197ef9ba7cdbae94c6 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.spi.FileSystemProvider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.memory.MemoryUtil;
public class ListenableFileSystem extends ForwardingFileSystem
public interface PathFilter
boolean accept(Path entry) throws IOException;
public interface Listener
public interface OnPreOpen extends Listener
void preOpen(Path path, Set<? extends OpenOption> options, FileAttribute<?>[] attrs) throws IOException;
public interface OnPostOpen extends Listener
void postOpen(Path path, Set<? extends OpenOption> options, FileAttribute<?>[] attrs, FileChannel channel) throws IOException;
public interface OnPreRead extends Listener
void preRead(Path path, FileChannel channel, long position, ByteBuffer dst) throws IOException;
public interface OnPostRead extends Listener
void postRead(Path path, FileChannel channel, long position, ByteBuffer dst, int read) throws IOException;
public interface OnPreTransferTo extends Listener
void preTransferTo(Path path, FileChannel channel, long position, long count, WritableByteChannel target) throws IOException;
public interface OnPostTransferTo extends Listener
void postTransferTo(Path path, FileChannel channel, long position, long count, WritableByteChannel target, long transfered) throws IOException;
public interface OnPreTransferFrom extends Listener
void preTransferFrom(Path path, FileChannel channel, ReadableByteChannel src, long position, long count) throws IOException;
public interface OnPostTransferFrom extends Listener
void postTransferFrom(Path path, FileChannel channel, ReadableByteChannel src, long position, long count, long transfered) throws IOException;
public interface OnPreWrite extends Listener
void preWrite(Path path, FileChannel channel, long position, ByteBuffer src) throws IOException;
public interface OnPostWrite extends Listener
void postWrite(Path path, FileChannel channel, long position, ByteBuffer src, int wrote) throws IOException;
public interface OnPrePosition extends Listener
void prePosition(Path path, FileChannel channel, long position, long newPosition) throws IOException;
public interface OnPostPosition extends Listener
void postPosition(Path path, FileChannel channel, long position, long newPosition) throws IOException;
public interface OnPreTruncate extends Listener
void preTruncate(Path path, FileChannel channel, long size, long targetSize) throws IOException;
public interface OnPostTruncate extends Listener
void postTruncate(Path path, FileChannel channel, long size, long targetSize, long newSize) throws IOException;
public interface OnPreForce extends Listener
void preForce(Path path, FileChannel channel, boolean metaData) throws IOException;
public interface OnPostForce extends Listener
void postForce(Path path, FileChannel channel, boolean metaData) throws IOException;
public interface Unsubscribable extends AutoCloseable
void close();
private final List<OnPreOpen> onPreOpen = new CopyOnWriteArrayList<>();
private final List<OnPostOpen> onPostOpen = new CopyOnWriteArrayList<>();
private final List<OnPreTransferTo> onPreTransferTo = new CopyOnWriteArrayList<>();
private final List<OnPostTransferTo> onPostTransferTo = new CopyOnWriteArrayList<>();
private final List<OnPreRead> onPreRead = new CopyOnWriteArrayList<>();
private final List<OnPostRead> onPostRead = new CopyOnWriteArrayList<>();
private final List<OnPreWrite> onPreWrite = new CopyOnWriteArrayList<>();
private final List<OnPostWrite> onPostWrite = new CopyOnWriteArrayList<>();
private final List<OnPreTransferFrom> onPreTransferFrom = new CopyOnWriteArrayList<>();
private final List<OnPostTransferFrom> onPostTransferFrom = new CopyOnWriteArrayList<>();
private final List<OnPreForce> onPreForce = new CopyOnWriteArrayList<>();
private final List<OnPostForce> onPostForce = new CopyOnWriteArrayList<>();
private final List<OnPreTruncate> onPreTruncate = new CopyOnWriteArrayList<>();
private final List<OnPostTruncate> onPostTruncate = new CopyOnWriteArrayList<>();
private final List<OnPrePosition> onPrePosition = new CopyOnWriteArrayList<>();
private final List<OnPostPosition> onPostPosition = new CopyOnWriteArrayList<>();
private final List<List<? extends Listener>> lists = Arrays.asList(onPreOpen, onPostOpen,
onPreRead, onPostRead,
onPreTransferTo, onPostTransferTo,
onPreWrite, onPostWrite,
onPreTransferFrom, onPostTransferFrom,
onPreForce, onPostForce,
onPreTruncate, onPostTruncate,
onPrePosition, onPostPosition);
private final ListenableFileSystemProvider provider;
public ListenableFileSystem(FileSystem delegate)
this.provider = new ListenableFileSystemProvider(super.provider());
public Unsubscribable listen(Listener listener)
List<List<? extends Listener>> matches = new ArrayList<>(1);
if (listener instanceof OnPreOpen)
onPreOpen.add((OnPreOpen) listener);
if (listener instanceof OnPostOpen)
onPostOpen.add((OnPostOpen) listener);
if (listener instanceof OnPreRead)
onPreRead.add((OnPreRead) listener);
if (listener instanceof OnPostRead)
onPostRead.add((OnPostRead) listener);
if (listener instanceof OnPreTransferTo)
onPreTransferTo.add((OnPreTransferTo) listener);
if (listener instanceof OnPostTransferTo)
onPostTransferTo.add((OnPostTransferTo) listener);
if (listener instanceof OnPreWrite)
onPreWrite.add((OnPreWrite) listener);
if (listener instanceof OnPostWrite)
onPostWrite.add((OnPostWrite) listener);
if (listener instanceof OnPreTransferFrom)
onPreTransferFrom.add((OnPreTransferFrom) listener);
if (listener instanceof OnPostTransferFrom)
onPostTransferFrom.add((OnPostTransferFrom) listener);
if (listener instanceof OnPreForce)
onPreForce.add((OnPreForce) listener);
if (listener instanceof OnPostForce)
onPostForce.add((OnPostForce) listener);
if (listener instanceof OnPreTruncate)
onPreTruncate.add((OnPreTruncate) listener);
if (listener instanceof OnPostTruncate)
onPostTruncate.add((OnPostTruncate) listener);
if (listener instanceof OnPrePosition)
onPrePosition.add((OnPrePosition) listener);
if (listener instanceof OnPostPosition)
onPostPosition.add((OnPostPosition) listener);
if (matches.isEmpty())
throw new IllegalArgumentException("Unable to find a listenable type for " + listener.getClass());
return () -> remove(matches, listener);
public Unsubscribable onPreOpen(OnPreOpen callback)
return listen(callback);
public Unsubscribable onPreOpen(PathFilter filter, OnPreOpen callback)
return onPreOpen((path, options, attrs) -> {
if (filter.accept(path))
callback.preOpen(path, options, attrs);
public Unsubscribable onPostOpen(OnPostOpen callback)
return listen(callback);
public Unsubscribable onPostOpen(PathFilter filter, OnPostOpen callback)
return onPostOpen((path, options, attrs, channel) -> {
if (filter.accept(path))
callback.postOpen(path, options, attrs, channel);
public Unsubscribable onPreRead(OnPreRead callback)
return listen(callback);
public Unsubscribable onPreRead(PathFilter filter, OnPreRead callback)
return onPreRead((path, channel, position, dst) -> {
if (filter.accept(path))
callback.preRead(path, channel, position, dst);
public Unsubscribable onPostRead(OnPostRead callback)
return listen(callback);
public Unsubscribable onPostRead(PathFilter filter, OnPostRead callback)
return onPostRead((path, channel, position, dst, read) -> {
if (filter.accept(path))
callback.postRead(path, channel, position, dst, read);
public Unsubscribable onPreTransferTo(OnPreTransferTo callback)
return listen(callback);
public Unsubscribable onPreTransferTo(PathFilter filter, OnPreTransferTo callback)
return onPreTransferTo((path, channel, position, count, target) -> {
if (filter.accept(path))
callback.preTransferTo(path, channel, position, count, target);
public Unsubscribable onPostTransferTo(OnPostTransferTo callback)
return listen(callback);
public Unsubscribable onPostTransferTo(PathFilter filter, OnPostTransferTo callback)
return onPostTransferTo((path, channel, position, count, target, transfered) -> {
if (filter.accept(path))
callback.postTransferTo(path, channel, position, count, target, transfered);
public Unsubscribable onPreTransferFrom(OnPreTransferFrom callback)
return listen(callback);
public Unsubscribable onPreTransferFrom(PathFilter filter, OnPreTransferFrom callback)
return onPreTransferFrom((path, channel, src, position, count) -> {
if (filter.accept(path))
callback.preTransferFrom(path, channel, src, position, count);
public Unsubscribable onPostTransferFrom(OnPostTransferFrom callback)
return listen(callback);
public Unsubscribable onPostTransferFrom(PathFilter filter, OnPostTransferFrom callback)
return onPostTransferFrom((path, channel, src, position, count, transfered) -> {
if (filter.accept(path))
callback.postTransferFrom(path, channel, src, position, count, transfered);
public Unsubscribable onPreWrite(OnPreWrite callback)
return listen(callback);
public Unsubscribable onPreWrite(PathFilter filter, OnPreWrite callback)
return onPreWrite((path, channel, position, src) -> {
if (filter.accept(path))
callback.preWrite(path, channel, position, src);
public Unsubscribable onPostWrite(OnPostWrite callback)
return listen(callback);
public Unsubscribable onPostWrite(PathFilter filter, OnPostWrite callback)
return onPostWrite((path, channel, position, src, wrote) -> {
if (filter.accept(path))
callback.postWrite(path, channel, position, src, wrote);
public Unsubscribable onPrePosition(OnPrePosition callbackk)
return listen(callbackk);
public Unsubscribable onPrePosition(PathFilter filter, OnPrePosition callbackk)
return onPrePosition((path, channel, position, newPosition) -> {
if (filter.accept(path))
callbackk.prePosition(path, channel, position, newPosition);
public Unsubscribable onPostPosition(OnPostPosition callbackk)
return listen(callbackk);
public Unsubscribable onPostPosition(PathFilter filter, OnPostPosition callbackk)
return onPostPosition((path, channel, position, newPosition) -> {
if (filter.accept(path))
callbackk.postPosition(path, channel, position, newPosition);
public Unsubscribable onPreTruncate(OnPreTruncate callbackk)
return listen(callbackk);
public Unsubscribable onPreTruncate(PathFilter filter, OnPreTruncate callbackk)
return onPreTruncate((path, channel, size, targetSize) -> {
if (filter.accept(path))
callbackk.preTruncate(path, channel, size, targetSize);
public Unsubscribable onPostTruncate(OnPostTruncate callbackk)
return listen(callbackk);
public Unsubscribable onPostTruncate(PathFilter filter, OnPostTruncate callbackk)
return onPostTruncate((path, channel, size, targetSize, newSize) -> {
if (filter.accept(path))
callbackk.postTruncate(path, channel, size, targetSize, newSize);
public Unsubscribable onPreForce(OnPreForce callbackk)
return listen(callbackk);
public Unsubscribable onPreForce(PathFilter filter, OnPreForce callback)
return onPreForce((path, channel, metadata) -> {
if (filter.accept(path))
callback.preForce(path, channel, metadata);
public Unsubscribable onPostForce(OnPostForce callbackk)
return listen(callbackk);
public Unsubscribable onPostForce(PathFilter filter, OnPostForce callback)
return onPostForce((path, channel, metadata) -> {
if (filter.accept(path))
callback.postForce(path, channel, metadata);
public void remove(Listener listener)
remove(lists, listener);
private static void remove(List<List<? extends Listener>> lists, Listener listener)
lists.forEach(l -> l.remove(listener));
public void clearListeners()
private interface ListenerAction<T>
void accept(T value) throws IOException;
private <T> void notifyListeners(List<T> listeners, ListenerAction<T> fn) throws IOException
for (T listener : listeners)
protected Path wrap(Path p)
return p instanceof ListenablePath ? p : new ListenablePath(p);
protected Path unwrap(Path p)
return p instanceof ListenablePath ? ((ListenablePath) p).delegate : p;
public ListenableFileSystemProvider provider()
return provider;
private class ListenableFileSystemProvider extends ForwardingFileSystemProvider
ListenableFileSystemProvider(FileSystemProvider delegate)
protected Path wrap(Path a)
return ListenableFileSystem.this.wrap(a);
protected Path unwrap(Path p)
return ListenableFileSystem.this.unwrap(p);
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException
int len = options.length;
Set<OpenOption> opts = new HashSet<>(len + 3);
if (len == 0)
for (OpenOption opt : options)
if (opt == StandardOpenOption.READ)
throw new IllegalArgumentException("READ not allowed");
return Channels.newOutputStream(newFileChannel(path, opts));
public InputStream newInputStream(Path path, OpenOption... options) throws IOException
for (OpenOption opt : options)
// All OpenOption values except for APPEND and WRITE are allowed
if (opt == StandardOpenOption.APPEND ||
opt == StandardOpenOption.WRITE)
throw new UnsupportedOperationException("'" + opt + "' not allowed");
Set<OpenOption> opts = new HashSet<>(Arrays.asList(options));
return Channels.newInputStream(newFileChannel(path, opts));
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException
return newFileChannel(path, options, attrs);
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException
notifyListeners(onPreOpen, l -> l.preOpen(path, options, attrs));
ListenableFileChannel channel = new ListenableFileChannel(path, delegate().newFileChannel(unwrap(path), options, attrs));
notifyListeners(onPostOpen, l -> l.postOpen(path, options, attrs, channel));
return channel;
public AsynchronousFileChannel newAsynchronousFileChannel(Path path, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>... attrs) throws IOException
throw new UnsupportedOperationException("TODO");
// block the APIs that try to switch FileSystem based off schema
public FileSystem newFileSystem(URI uri, Map<String, ?> env) throws IOException
throw new UnsupportedOperationException();
public FileSystem newFileSystem(Path path, Map<String, ?> env) throws IOException
throw new UnsupportedOperationException();
public String getScheme()
throw new UnsupportedOperationException();
public FileSystem getFileSystem(URI uri)
throw new UnsupportedOperationException();
private class ListenablePath extends ForwardingPath
public ListenablePath(Path delegate)
protected Path wrap(Path a)
return ListenableFileSystem.this.wrap(a);
protected Path unwrap(Path p)
return ListenableFileSystem.this.unwrap(p);
public FileSystem getFileSystem()
return ListenableFileSystem.this;
public File toFile()
if (delegate().getFileSystem() == FileSystems.getDefault())
return delegate().toFile();
throw new UnsupportedOperationException();
private class ListenableFileChannel extends ForwardingFileChannel
private final AtomicReference<Mapped> mutable = new AtomicReference<>();
private final Path path;
ListenableFileChannel(Path path, FileChannel delegate)
this.path = path;
public int read(ByteBuffer dst) throws IOException
long position = position();
notifyListeners(onPreRead, l -> l.preRead(path, this, position, dst));
int read =;
notifyListeners(onPostRead, l -> l.postRead(path, this, position, dst, read));
return read;
public int read(ByteBuffer dst, long position) throws IOException
notifyListeners(onPreRead, l -> l.preRead(path, this, position, dst));
int read =, position);
notifyListeners(onPostRead, l -> l.postRead(path, this, position, dst, read));
return read;
public int write(ByteBuffer src) throws IOException
long position = position();
notifyListeners(onPreWrite, l -> l.preWrite(path, this, position, src));
int write = super.write(src);
notifyListeners(onPostWrite, l -> l.postWrite(path, this, position, src, write));
return write;
public int write(ByteBuffer src, long position) throws IOException
notifyListeners(onPreWrite, l -> l.preWrite(path, this, position, src));
int write = super.write(src, position);
notifyListeners(onPostWrite, l -> l.postWrite(path, this, position, src, write));
return write;
public FileChannel position(long newPosition) throws IOException
long position = position();
notifyListeners(onPrePosition, l -> l.prePosition(path, this, position, newPosition));
notifyListeners(onPostPosition, l -> l.postPosition(path, this, position, newPosition));
return this;
public FileChannel truncate(long size) throws IOException
long currentSize = this.size();
notifyListeners(onPreTruncate, l -> l.preTruncate(path, this, currentSize, size));
long latestSize = this.size();
notifyListeners(onPostTruncate, l -> l.postTruncate(path, this, currentSize, size, latestSize));
return this;
public void force(boolean metaData) throws IOException
notifyListeners(onPreForce, l -> l.preForce(path, this, metaData));
notifyListeners(onPostForce, l -> l.postForce(path, this, metaData));
public long transferTo(long position, long count, WritableByteChannel target) throws IOException
notifyListeners(onPreTransferTo, l -> l.preTransferTo(path, this, position, count, target));
long transfered = super.transferTo(position, count, target);
notifyListeners(onPostTransferTo, l -> l.postTransferTo(path, this, position, count, target, transfered));
return transfered;
public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException
notifyListeners(onPreTransferFrom, l -> l.preTransferFrom(path, this, src, position, count));
long transfered = super.transferFrom(src, position, count);
notifyListeners(onPostTransferFrom, l -> l.postTransferFrom(path, this, src, position, count, transfered));
return transfered;
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
// this behavior isn't 100% correct... if you mix access with FileChanel and ByteBuffer you will get different
// results than with a real mmap solution... This limitation is due to ByteBuffer being private, so not able
// to create custom BBs to mimc this access...
if (mode == MapMode.READ_WRITE && mutable.get() != null)
throw new UnsupportedOperationException("map called twice with mode READ_WRITE; first was " + mutable.get() + ", now " + new Mapped(mode, null, position, Math.toIntExact(size)));
int isize = Math.toIntExact(size);
MappedByteBuffer bb = (MappedByteBuffer) ByteBuffer.allocateDirect(isize);
Mapped mapped = new Mapped(mode, bb, position, isize);
if (mode == MapMode.READ_ONLY)
ByteBufferUtil.readFully(this,, position);;
Runnable forcer = () -> {
MemoryUtil.setAttachment(bb, forcer);
else if (mode == MapMode.READ_WRITE)
if (delegate().size() - position > 0)
ByteBufferUtil.readFully(this,, position);;
// with real files the FD gets copied so the close of the channel does not block the BB mutation
// from flushing... it's possible to support this use case, but kept things simplier for now by
// failing if the backing channel was closed.
Runnable forcer = () -> {
ByteBuffer local = bb.duplicate();
long pos = position;
while (local.hasRemaining())
int wrote = write(local, pos);
if (wrote == -1)
throw new EOFException();
pos += wrote;
catch (IOException e)
throw new UncheckedIOException(e);
MemoryUtil.setAttachment(bb, forcer);
if (!mutable.compareAndSet(null, mapped))
throw new UnsupportedOperationException("map called twice");
throw new UnsupportedOperationException("Unsupported mode: " + mode);
protected void implCloseChannel() throws IOException
private static class Mapped
final FileChannel.MapMode mode;
final MappedByteBuffer bb;
final long position;
final int size;
private Mapped(FileChannel.MapMode mode, MappedByteBuffer bb, long position, int size)
this.mode = mode; = bb;
this.position = position;
this.size = size;
public String toString()
return "Mapped{" +
"mode=" + mode +
", position=" + position +
", size=" + size +