blob: 4f74504030ad5fddb07c7a354ad4cc184ec25811 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.cleaner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.dbi.TTL;
import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.log.entry.INLogEntry;
import com.sleepycat.je.log.entry.LNLogEntry;
import com.sleepycat.je.log.entry.LogEntry;
import com.sleepycat.je.tree.BIN;
import com.sleepycat.je.tree.Key;
/**
* Tracks the expired bytes in each time window, i.e., a histogram. A separate
* ExpirationTracker instance is used for each tracked data file.
* <p>
* A copy-on-write approach is used to store a file number-to-counter mapping,
* and AtomicIntegers are used for the counters. This avoids blocking when
* tracking information for the current end-of-log file. That way, the
* end-of-log tracker can be used by multiple threads without holding a global
* mutex. This tracker is maintained by the LogManager and a new tracker is
* created for each file, and then flushed to disk when starting a new file as
* a FileExpirationLN.
* <p>
* An ExpirationTracker instance is used to track expired data when performing
* the first pass of two pass cleaning, although in that case it is only used
* by one thread, so the optimizations are irrelevant.
* <p>
* The {@link #serialize}} method is called to represent the histogram in a
* single byte array. This array is the record "data" in a FileExpirationLN.
* It is also stored in memory, in the UtilizationProfile, and used during
* cleaning to calculate the number of expired bytes per file.
*/
public class ExpirationTracker {
private final long fileNum;
/* Copy-on-write map of expiration time (in hours) to byte counter. */
private volatile Map<Integer, AtomicInteger> map = new HashMap<>();
/**
* We wait for pendingTrackCalls to go to zero before flushing the
* tracker to its database.
*/
private AtomicInteger pendingTrackCalls = new AtomicInteger(0);
public ExpirationTracker(final long fileNum) {
this.fileNum = fileNum;
}
public long getFileNum() {
return fileNum;
}
/**
* Tracks expiration of a BIN or LN.
*
* @param entry is the LogEntry that was just logged. INs and LNs will be
* processed here, and must be protected by their parent latch.
*
* @param size byte size of logged entry.
*/
public void track(final LogEntry entry, final int size) {
pendingTrackCalls.decrementAndGet();
final LogEntryType type = entry.getLogType();
if (type.isUserLNType()) {
final LNLogEntry<?> lnEntry = (LNLogEntry<?>) entry;
final int expiration = lnEntry.getExpiration();
if (expiration == 0) {
return;
}
track(expiration, lnEntry.isExpirationInHours(), size);
return;
}
if (!type.equals(LogEntryType.LOG_BIN) &&
!type.equals(LogEntryType.LOG_BIN_DELTA)){
return;
}
final INLogEntry<?> inEntry = (INLogEntry<?>) entry;
final BIN bin = inEntry.getBINWithExpiration();
if (bin == null) {
return;
}
final boolean inHours = bin.isExpirationInHours();
final int entrySize = size / bin.getNEntries();
for (int i = 0; i < bin.getNEntries(); i += 1) {
final int expiration = bin.getExpiration(i);
if (expiration == 0) {
continue;
}
track(expiration, inHours, entrySize);
}
}
/**
* Adds a single expiration value.
*/
private void track(int expiration,
final boolean expirationInHours,
final int size) {
final Integer expInHours =
expirationInHours ? expiration : (24 * expiration);
AtomicInteger counter = map.get(expInHours);
/*
* The map is modified only while synchronized, which prevents two
* threads from adding the same entry or a reader thread from accessing
* the map while it is being modified. To guarantee this we must
* "install" the new map in the volatile field only after adding the
* new counter.
*/
if (counter == null) {
synchronized (this) {
/*
* Check again while synchronized, since another thread may
* have added it. This "double check" is safe because the 'map'
* field is volatile.
*/
counter = map.get(expInHours);
if (counter == null) {
final Map<Integer, AtomicInteger> newMap =
new HashMap<>(map);
counter = new AtomicInteger(0);
newMap.put(expInHours, counter);
map = newMap;
}
}
}
counter.addAndGet(size);
}
/**
* Increment the number of calls to {@link #track(int, boolean, int)}
* that must be made before the tracked data can be flushed to its
* database.
*/
public void incrementPendingTrackCalls() {
pendingTrackCalls.incrementAndGet();
}
/**
* Returns whether to wait for outstanding calls to {@link
* #track(int, boolean, int)} before flushing the tracked data to its
* database.
*/
boolean hasPendingTrackCalls() {
return pendingTrackCalls.get() > 0;
}
/**
* Computes the current expired bytes for the given time.
*/
public int getExpiredBytes(final long time) {
final int expLimit = (int) (time / TTL.MILLIS_PER_HOUR);
int expiredSize = 0;
for (final Map.Entry<Integer, AtomicInteger> entry : map.entrySet()) {
final int exp = entry.getKey();
if (exp > expLimit) {
continue;
}
expiredSize += entry.getValue().get();
}
return expiredSize;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("{ExpTracker file= ").append(fileNum);
for (final Map.Entry<Integer, AtomicInteger> entry :
new TreeMap<>(map).entrySet()) {
final int exp = entry.getKey();
sb.append(' ').append(TTL.formatExpiration(exp, true));
sb.append('=').append(entry.getValue().get());
}
sb.append('}');
return sb.toString();
}
public static String toString(final byte[] serializedForm) {
final StringBuilder sb = new StringBuilder();
sb.append("{ExpSerialized");
final TupleInput in = new TupleInput(
serializedForm, 1, serializedForm.length - 1);
final boolean hours = isExpirationInHours(serializedForm);
int prevExp = 0;
while (in.available() > 0) {
final int exp = in.readPackedInt() + prevExp;
final int size = in.readPackedInt();
sb.append(' ').append(TTL.formatExpiration(exp, hours));
sb.append('=').append(size);
prevExp = exp;
}
sb.append('}');
return sb.toString();
}
/**
* Computes the expired bytes for the given serialized histogram and
* expiration time.
*/
static int getExpiredBytes(final byte[] serializedForm,
final int dayLimit,
final int hourLimit) {
final int expLimit =
ExpirationTracker.isExpirationInHours(serializedForm) ?
hourLimit : dayLimit;
final TupleInput in = new TupleInput(
serializedForm, 1, serializedForm.length - 1);
int expiredSize = 0;
int prevExp = 0;
while (in.available() > 0) {
final int exp = in.readPackedInt() + prevExp;
if (exp > expLimit) {
break;
}
expiredSize += in.readPackedInt();
prevExp = exp;
}
return expiredSize;
}
/**
* Converts this object to a serialized form that is compact and can be
* used to quickly find the total bytes after a given time. Returns an
* empty array if no data in this file has an expiration time.
*
* The serialized form is a series of {interval,byteSize} pairs that is
* ordered by expiration time and run length encoded. The interval and
* byteSize are packed integers. The interval is the delta between the
* current and previous expiration value. All expiration values are in days
* if all values are on a day boundary; otherwise they are in hours. Days
* are used, when possible, to reduce the size of the delta, using less
* space due to the packed integer format.
*/
byte[] serialize() {
final Map<Integer, AtomicInteger> myMap = map;
if (myMap.isEmpty()) {
return Key.EMPTY_KEY;
}
final List<Integer> expList = new ArrayList<>(myMap.size());
expList.addAll(myMap.keySet());
Collections.sort(expList);
boolean hours = false;
for (int exp : expList) {
if (exp % 24 != 0) {
hours = true;
break;
}
}
final TupleOutput out = new TupleOutput();
out.write(hours ? 1 : 0);
int prevExp = 0;
for (int exp : expList) {
final AtomicInteger counter = myMap.get(exp);
if (!hours) {
exp /= 24;
}
out.writePackedInt(exp - prevExp);
out.writePackedInt(counter.get());
prevExp = exp;
}
return out.toByteArray();
}
/**
* Returns whether the given serialized form has expired values in hours.
* If false is returned, all values expired on day boundaries.
*/
static boolean isExpirationInHours(final byte[] serialized) {
return (serialized[0] == 1);
}
}