blob: 2ae854562bb7f1846c9b3f86991cb098dd80660c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container.disk;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* An implementation of {@link DiskSpaceMonitor} that polls for disk usage based on a specified
* polling interval.
* <p>
* This class is thread-safe.
*/
public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
private enum State { INIT, RUNNING, STOPPED }
private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class);
// Note: we use this as a set where the value is always Boolean.TRUE.
private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
// Used to guard write access to state and listenerSet.
private final Object lock = new Object();
private final ScheduledExecutorService schedulerService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("Samza PollingScanDiskSpaceMonitor Thread-%d")
.setDaemon(true)
.build());
private final Set<Path> watchPaths;
private final long pollingIntervalMillis;
private State state = State.INIT;
/**
* Returns the total size in bytes used by the specified paths. This function guarantees that it
* will not double count overlapping portions of the path set. For example, with a trivial
* overlap of /A and /A, it will count /A only once. It also handles other types of overlaps
* similarly, such as counting /A/B only once given the paths /A and /A/B.
* <p>
* This function is exposed as package private to simplify testing various cases without involving
* an executor. Alternatively this could have been pulled out to a utility class, but it would
* unnecessarily pollute the global namespace.
*/
static long getSpaceUsed(Set<Path> paths) {
ArrayDeque<Path> pathStack = new ArrayDeque<>();
for (Path path : paths) {
pathStack.push(path);
}
// Track the directories we've visited to ensure we're not double counting. It would be
// preferable to resolve overlap once at startup, but the problem is that the filesystem may
// change over time and, in fact, at startup I found that the rocks DB store directory was not
// created by the time the disk space monitor was started.
Set<Path> visited = new HashSet<>();
long totalBytes = 0;
while (!pathStack.isEmpty()) {
try {
// We need to resolve to the real path to ensure that we don't inadvertently double count
// due to different paths to the same directory (e.g. /A and /A/../A).
Path current = pathStack.pop().toRealPath();
if (visited.contains(current)) {
continue;
}
visited.add(current);
BasicFileAttributes currentAttrs = Files.readAttributes(current,
BasicFileAttributes.class);
if (currentAttrs.isDirectory()) {
try (DirectoryStream<Path> directoryListing = Files.newDirectoryStream(current)) {
for (Path child : directoryListing) {
pathStack.push(child);
}
}
} else if (currentAttrs.isRegularFile()) {
totalBytes += currentAttrs.size();
}
} catch (IOException e) {
// If we can't stat the file, just ignore it. This can happen, for example, if we scan
// a directory, but by the time we get to stat'ing the file it has been deleted (e.g.
// due to compaction, rotation, etc.).
}
}
return totalBytes;
}
/**
* Creates a new disk space monitor that uses a periodic polling mechanism.
*
* @param watchPaths the set of paths to watch
* @param pollingIntervalMillis the polling interval in milliseconds
*/
public PollingScanDiskSpaceMonitor(Set<Path> watchPaths, long pollingIntervalMillis) {
this.watchPaths = Collections.unmodifiableSet(new HashSet<>(watchPaths));
this.pollingIntervalMillis = pollingIntervalMillis;
}
@Override
public void start() {
synchronized (lock) {
switch (state) {
case INIT:
schedulerService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
updateSample();
}
}, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS);
state = State.RUNNING;
break;
case RUNNING:
// start is idempotent
return;
case STOPPED:
throw new IllegalStateException("PollingScanDiskSpaceMonitor was stopped and cannot be restarted.");
}
}
}
@Override
public void stop() {
synchronized (lock) {
// We could also wait for full termination of the scheduler service, but it is overkill for
// our use case.
schedulerService.shutdownNow();
listenerSet.clear();
state = State.STOPPED;
}
}
@Override
public boolean registerListener(Listener listener) {
synchronized (lock) {
if (state != State.STOPPED) {
return listenerSet.putIfAbsent(listener, Boolean.TRUE) == Boolean.TRUE;
}
}
return false;
}
/**
* Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
* and false otherwise.
* <p>
* This is currently exposed at the package private level for tests only.
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return schedulerService.awaitTermination(timeout, unit);
}
private void updateSample() {
long totalBytes = getSpaceUsed(watchPaths);
for (Listener listener : listenerSet.keySet()) {
try {
listener.onUpdate(totalBytes);
} catch (Throwable e) {
// catch an exception thrown by one listener so that it does not impact other listeners.
log.error("Exception thrown by a listener ", e);
}
}
}
}