blob: 63f750d3da0ff63234683f2cc6ccf942178f1322 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages cleanup of container ZNodes. This class is meant to only
* be run from the leader. There's no harm in running from followers/observers
* but that will be extra work that's not needed. Once started, it periodically
* checks container nodes that have a cversion > 0 and have no children. A
* delete is attempted on the node. The result of the delete is unimportant.
* If the proposal fails or the container node is not empty there's no harm.
*/
public class ContainerManager {
private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
private final ZKDatabase zkDb;
private final RequestProcessor requestProcessor;
private final int checkIntervalMs;
private final int maxPerMinute;
private final long maxNeverUsedIntervalMs;
private final Timer timer;
private final AtomicReference<TimerTask> task = new AtomicReference<TimerTask>(null);
/**
* @param zkDb the ZK database
* @param requestProcessor request processer - used to inject delete
* container requests
* @param checkIntervalMs how often to check containers in milliseconds
* @param maxPerMinute the max containers to delete per second - avoids
* herding of container deletions
*/
public ContainerManager(ZKDatabase zkDb, RequestProcessor requestProcessor, int checkIntervalMs, int maxPerMinute) {
this(zkDb, requestProcessor, checkIntervalMs, maxPerMinute, 0);
}
/**
* @param zkDb the ZK database
* @param requestProcessor request processer - used to inject delete
* container requests
* @param checkIntervalMs how often to check containers in milliseconds
* @param maxPerMinute the max containers to delete per second - avoids
* herding of container deletions
* @param maxNeverUsedIntervalMs the max time in milliseconds that a container that has never had
* any children is retained
*/
public ContainerManager(ZKDatabase zkDb, RequestProcessor requestProcessor, int checkIntervalMs, int maxPerMinute, long maxNeverUsedIntervalMs) {
this.zkDb = zkDb;
this.requestProcessor = requestProcessor;
this.checkIntervalMs = checkIntervalMs;
this.maxPerMinute = maxPerMinute;
this.maxNeverUsedIntervalMs = maxNeverUsedIntervalMs;
timer = new Timer("ContainerManagerTask", true);
LOG.info("Using checkIntervalMs={} maxPerMinute={} maxNeverUsedIntervalMs={}", checkIntervalMs, maxPerMinute, maxNeverUsedIntervalMs);
}
/**
* start/restart the timer the runs the check. Can safely be called
* multiple times.
*/
public void start() {
if (task.get() == null) {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
try {
checkContainers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("interrupted");
cancel();
} catch (Throwable e) {
LOG.error("Error checking containers", e);
}
}
};
if (task.compareAndSet(null, timerTask)) {
timer.scheduleAtFixedRate(timerTask, checkIntervalMs, checkIntervalMs);
}
}
}
/**
* stop the timer if necessary. Can safely be called multiple times.
*/
public void stop() {
TimerTask timerTask = task.getAndSet(null);
if (timerTask != null) {
timerTask.cancel();
}
timer.cancel();
}
/**
* Manually check the containers. Not normally used directly
*/
public void checkContainers() throws InterruptedException {
long minIntervalMs = getMinIntervalMs();
for (String containerPath : getCandidates()) {
long startMs = Time.currentElapsedTime();
ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes());
Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, path, null);
try {
LOG.info("Attempting to delete candidate container: {}", containerPath);
postDeleteRequest(request);
} catch (Exception e) {
LOG.error("Could not delete container: {}", containerPath, e);
}
long elapsedMs = Time.currentElapsedTime() - startMs;
long waitMs = minIntervalMs - elapsedMs;
if (waitMs > 0) {
Thread.sleep(waitMs);
}
}
}
// VisibleForTesting
protected void postDeleteRequest(Request request) throws RequestProcessor.RequestProcessorException {
requestProcessor.processRequest(request);
}
// VisibleForTesting
protected long getMinIntervalMs() {
return TimeUnit.MINUTES.toMillis(1) / maxPerMinute;
}
// VisibleForTesting
protected Collection<String> getCandidates() {
Set<String> candidates = new HashSet<String>();
for (String containerPath : zkDb.getDataTree().getContainers()) {
DataNode node = zkDb.getDataTree().getNode(containerPath);
if ((node != null) && node.getChildren().isEmpty()) {
/*
cversion > 0: keep newly created containers from being deleted
before any children have been added. If you were to create the
container just before a container cleaning period the container
would be immediately be deleted.
*/
if (node.stat.getCversion() > 0) {
candidates.add(containerPath);
} else {
/*
Users may not want unused containers to live indefinitely. Allow a system
property to be set that sets the max time for a cversion-0 container
to stay before being deleted
*/
if ((maxNeverUsedIntervalMs != 0) && (getElapsed(node) > maxNeverUsedIntervalMs)) {
candidates.add(containerPath);
}
}
}
if ((node != null) && (node.stat.getCversion() > 0) && (node.getChildren().isEmpty())) {
candidates.add(containerPath);
}
}
for (String ttlPath : zkDb.getDataTree().getTtls()) {
DataNode node = zkDb.getDataTree().getNode(ttlPath);
if (node != null) {
Set<String> children = node.getChildren();
if (children.isEmpty()) {
if (EphemeralType.get(node.stat.getEphemeralOwner()) == EphemeralType.TTL) {
long ttl = EphemeralType.TTL.getValue(node.stat.getEphemeralOwner());
if ((ttl != 0) && (getElapsed(node) > ttl)) {
candidates.add(ttlPath);
}
}
}
}
}
return candidates;
}
// VisibleForTesting
protected long getElapsed(DataNode node) {
return Time.currentWallTime() - node.stat.getMtime();
}
}