blob: d627f4662fdd50550d7b7018dfd9b9f8bea94909 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License
package org.apache.impala.catalog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.Uninterruptibles;
import com.sun.management.GarbageCollectorMXBean;
import com.sun.management.GcInfo;
import org.apache.impala.common.Reference;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TTableName;
import org.apache.log4j.Logger;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Automatically invalidates recently unused tables. There are currently 2 rules
* implemented:
* 1. Invalidate a certain percentage of the least recently used tables after a GC with an
* almost full old generation. The fullness of the GC generation depends on the maximum
* heap size.
* 2. If invalidate_tables_timeout_s is set in the backend, unused tables older than the
* threshold are invalidated periodically.
*/
public class CatalogdTableInvalidator {
public static final Logger LOG = Logger.getLogger(CatalogdTableInvalidator.class);
/**
* Plugable time source for tests. Defined as static to avoid passing
* CatalogdTableInvalidator everywhere the clock is used.
*/
@VisibleForTesting
static Ticker TIME_SOURCE = Ticker.systemTicker();
private static long DAEMON_MAXIMUM_SLEEP_NANO = TimeUnit.MINUTES.toNanos(5);
final private long unusedTableTtlNano_;
final private boolean invalidateTableOnMemoryPressure_;
final private CatalogServiceCatalog catalog_;
/**
* A thread waking up periodically to check if eviction is needed.
*/
final private Thread daemonThread_;
/**
* The threshold above which the old gen is considered almost full.
*/
final private double oldGenFullThreshold_;
/**
* The ratio of tables to invalidate when the old gen is almost full.
*/
final private double gcInvalidationFraction_;
/**
* The number of times the daemon thread wakes up and scans the tables for invalidation.
* It's useful for tests to ensure that a scan happened.
*/
@VisibleForTesting
AtomicLong scanCount_ = new AtomicLong();
private GarbageCollectorMXBean oldGenGcBean_;
/**
* The name of the old gen memory pool.
*/
private String oldGcGenName_;
/**
* The value of oldGenGcBean_.getCollectionCount() when the last memory-based
* invalidation was executed.
*/
private long lastObservedGcCount_;
private boolean stopped_ = false;
/**
* Last time an time-based invalidation is executed in nanoseconds.
*/
private long lastInvalidationTime_;
CatalogdTableInvalidator(CatalogServiceCatalog catalog, final long unusedTableTtlSec,
boolean invalidateTableOnMemoryPressure, double oldGenFullThreshold,
double gcInvalidationFraction) {
catalog_ = catalog;
unusedTableTtlNano_ = TimeUnit.SECONDS.toNanos(unusedTableTtlSec);
oldGenFullThreshold_ = oldGenFullThreshold;
gcInvalidationFraction_ = gcInvalidationFraction;
lastInvalidationTime_ = TIME_SOURCE.read();
invalidateTableOnMemoryPressure_ =
invalidateTableOnMemoryPressure && tryInstallGcListener();
daemonThread_ = new Thread(new DaemonThread());
daemonThread_.setDaemon(true);
daemonThread_.setName("CatalogTableInvalidator timer");
daemonThread_.start();
}
public static CatalogdTableInvalidator create(CatalogServiceCatalog catalog,
BackendConfig config) {
final boolean invalidateTableOnMemoryPressure =
config.invalidateTablesOnMemoryPressure();
final int timeoutSec = config.getInvalidateTablesTimeoutS();
final double gcOldGenFullThreshold =
config.getInvalidateTablesGcOldGenFullThreshold();
final double fractionOnMemoryPressure =
config.getInvalidateTablesFractionOnMemoryPressure();
Preconditions.checkArgument(timeoutSec >= 0,
"invalidate_tables_timeout_s must be a non-negative integer.");
Preconditions.checkArgument(gcOldGenFullThreshold >= 0 && gcOldGenFullThreshold <= 1,
"invalidate_tables_gc_old_gen_full_threshold must be in [0, 1].");
Preconditions
.checkArgument(fractionOnMemoryPressure >= 0 && fractionOnMemoryPressure <= 1,
"invalidate_tables_fraction_on_memory_pressure must be in [0, 1].");
if (timeoutSec > 0 || invalidateTableOnMemoryPressure) {
return new CatalogdTableInvalidator(catalog, timeoutSec,
invalidateTableOnMemoryPressure, gcOldGenFullThreshold,
fractionOnMemoryPressure);
} else {
return null;
}
}
static long nanoTime() {
return TIME_SOURCE.read();
}
/**
* Try to find the old generation memory pool in the GC beans and listen to the GC bean
* which the old gen memory pool belongs to. Return whether the GC beans are supported.
* If the return value is false, the listener is not installed.
*/
private boolean tryInstallGcListener() {
String commonErrMsg = "Continuing without GC-triggered invalidation of tables.";
List<GarbageCollectorMXBean> gcbeans = java.lang.management.ManagementFactory
.getPlatformMXBeans(GarbageCollectorMXBean.class);
GcNotificationListener gcNotificationListener = new GcNotificationListener();
boolean foundOldPool = false;
for (GarbageCollectorMXBean gcbean : gcbeans) {
for (String poolName : gcbean.getMemoryPoolNames()) {
if (!poolName.contains("Old")) continue;
if (!(gcbean instanceof NotificationEmitter)) {
LOG.warn("GCBean " + gcbean.getClass().getName() + " is not supported " +
"because it does not implement NotificationEmitter. " + commonErrMsg);
return false;
}
oldGenGcBean_ = gcbean;
oldGcGenName_ = poolName;
lastObservedGcCount_ = gcbean.getCollectionCount();
foundOldPool = true;
((NotificationEmitter) gcbean)
.addNotificationListener(gcNotificationListener, null, null);
break;
}
}
if (!foundOldPool) {
LOG.warn("Cannot find old generation memory pool in the GC beans. " + commonErrMsg);
}
return foundOldPool;
}
/**
* Detect whether a GC happened since the last observation and the old generation is
* loaded more than the configured threshold. If so it returns true indicating that
* tables should be evicted because of memory pressure.
*/
private boolean shouldEvictFromFullHeapAfterGc() {
if (!invalidateTableOnMemoryPressure_) return false;
long gcCount = oldGenGcBean_.getCollectionCount();
if (gcCount > lastObservedGcCount_) {
lastObservedGcCount_ = gcCount;
GcInfo lastGcInfo = oldGenGcBean_.getLastGcInfo();
if (lastGcInfo == null) {
LOG.warn("gcBean.getLastGcInfo() returned null. Table invalidation based on " +
"memory pressure was skipped.");
return false;
}
MemoryUsage tenuredGenUsage = lastGcInfo.getMemoryUsageAfterGc().get(oldGcGenName_);
Preconditions.checkState(tenuredGenUsage != null);
return tenuredGenUsage.getMax() * oldGenFullThreshold_ < tenuredGenUsage.getUsed();
}
return false;
}
private void invalidateSome(double invalidationFraction) {
List<Table> tables = new ArrayList<>();
for (Db db : catalog_.getAllDbs()) {
for (Table table : db.getTables()) {
if (!(table instanceof IncompleteTable)) tables.add(table);
}
}
// TODO: use quick select
Collections.sort(tables, new Comparator<Table>() {
@Override
public int compare(Table o1, Table o2) {
return Long.compare(o1.getLastUsedTime(), o2.getLastUsedTime());
}
});
for (int i = 0; i < tables.size() * invalidationFraction; ++i) {
TTableName tTableName = tables.get(i).getTableName().toThrift();
Reference<Boolean> tblWasRemoved = new Reference<>();
Reference<Boolean> dbWasAdded = new Reference<>();
catalog_.invalidateTable(tTableName, tblWasRemoved, dbWasAdded);
LOG.info("Table " + tables.get(i).getFullName() + " invalidated due to memory " +
"pressure.");
}
}
private void invalidateOlderThan(long retireAgeNano) {
long now = TIME_SOURCE.read();
for (Db db : catalog_.getAllDbs()) {
for (Table table : catalog_.getAllTables(db)) {
if (table instanceof IncompleteTable) continue;
long inactivityTime = now - table.getLastUsedTime();
if (inactivityTime <= retireAgeNano) continue;
Reference<Boolean> tblWasRemoved = new Reference<>();
Reference<Boolean> dbWasAdded = new Reference<>();
TTableName tTableName = table.getTableName().toThrift();
catalog_.invalidateTable(tTableName, tblWasRemoved, dbWasAdded);
LOG.info(
"Invalidated " + table.getFullName() + " due to inactivity for " +
TimeUnit.NANOSECONDS.toSeconds(inactivityTime) + " seconds.");
}
}
}
void stop() {
synchronized (this) {
stopped_ = true;
notify();
}
try {
daemonThread_.join();
} catch (InterruptedException e) {
LOG.warn("stop() is interrupted", e);
}
}
@VisibleForTesting
synchronized void wakeUpForTests() {
notify();
}
private class DaemonThread implements Runnable {
@Override
public void run() {
long sleepTimeNano = Math.min(unusedTableTtlNano_ / 10, DAEMON_MAXIMUM_SLEEP_NANO);
while (true) {
try {
synchronized (CatalogdTableInvalidator.this) {
if (stopped_) return;
if (shouldEvictFromFullHeapAfterGc()) {
invalidateSome(gcInvalidationFraction_);
scanCount_.incrementAndGet();
}
long now = nanoTime();
// Wait for a fraction of unusedTableTtlNano_ if time-based invalidation is
// enabled
if (unusedTableTtlNano_ > 0 && now >= lastInvalidationTime_ + sleepTimeNano) {
invalidateOlderThan(unusedTableTtlNano_);
lastInvalidationTime_ = now;
scanCount_.incrementAndGet();
}
// Wait unusedTableTtlSec if it is configured. Otherwise wait
// indefinitely.
TimeUnit.NANOSECONDS.timedWait(CatalogdTableInvalidator.this, sleepTimeNano);
}
} catch (Exception e) {
LOG.warn("Unexpected exception thrown while attempting to automatically " +
"invalidate tables. Will retry in 5 seconds.", e);
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
}
}
}
}
private class GcNotificationListener implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object handback) {
synchronized (CatalogdTableInvalidator.this) {
CatalogdTableInvalidator.this.notify();
}
}
}
}