blob: 37c258f4320029629a7a1573c6cbe8aa68e13e65 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.Map.Entry;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Contains the logic to lookup a leveldb by timestamp so that multiple smaller
* databases can roll according to the configured period and evicted efficiently
* via operating system directory removal.
*/
class RollingLevelDB {
/** Logger for this class. */
private static final Logger LOG =
LoggerFactory.getLogger(RollingLevelDB.class);
/** Factory to open and create new leveldb instances. */
private static JniDBFactory factory = new JniDBFactory();
/** Thread safe date formatter. */
private FastDateFormat fdf;
/** Date parser. */
private SimpleDateFormat sdf;
/** Calendar to calculate the current and next rolling period. */
private GregorianCalendar cal = new GregorianCalendar(
TimeZone.getTimeZone("GMT"));
/** Collection of all active rolling leveldb instances. */
private final TreeMap<Long, DB> rollingdbs;
/** Collection of all rolling leveldb instances to evict. */
private final TreeMap<Long, DB> rollingdbsToEvict;
/** Name of this rolling level db. */
private final String name;
/** Calculated timestamp of when to roll a new leveldb instance. */
private volatile long nextRollingCheckMillis = 0;
/** File system instance to find and create new leveldb instances. */
private FileSystem lfs = null;
/** Directory to store rolling leveldb instances. */
private Path rollingDBPath;
/** Configuration for this object. */
private Configuration conf;
/** Rolling period. */
private RollingPeriod rollingPeriod;
/**
* Rolling leveldb instances are evicted when their endtime is earlier than
* the current time minus the time to live value.
*/
private long ttl;
/** Whether time to live is enabled. */
private boolean ttlEnabled;
/** Encapsulates the rolling period to date format lookup. */
enum RollingPeriod {
DAILY {
@Override
public String dateFormat() {
return "yyyy-MM-dd";
}
},
HALF_DAILY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH";
}
},
QUARTER_DAILY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH";
}
},
HOURLY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH";
}
},
MINUTELY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH-mm";
}
};
public abstract String dateFormat();
}
/**
* Convenience class for associating a write batch with its rolling leveldb
* instance.
*/
public static class RollingWriteBatch {
/** Leveldb object. */
private final DB db;
/** Write batch for the db object. */
private final WriteBatch writeBatch;
public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
this.db = db;
this.writeBatch = writeBatch;
}
public DB getDB() {
return db;
}
public WriteBatch getWriteBatch() {
return writeBatch;
}
public void write() {
db.write(writeBatch);
}
public void close() {
IOUtils.cleanupWithLogger(LOG, writeBatch);
}
}
RollingLevelDB(String name) {
this.name = name;
this.rollingdbs = new TreeMap<Long, DB>();
this.rollingdbsToEvict = new TreeMap<Long, DB>();
}
protected String getName() {
return name;
}
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
public long getNextRollingTimeMillis() {
return nextRollingCheckMillis;
}
public long getTimeToLive() {
return ttl;
}
public boolean getTimeToLiveEnabled() {
return ttlEnabled;
}
protected void setNextRollingTimeMillis(final long timestamp) {
this.nextRollingCheckMillis = timestamp;
LOG.info("Next rolling time for " + getName() + " is "
+ fdf.format(nextRollingCheckMillis));
}
public void init(final Configuration config) throws Exception {
LOG.info("Initializing RollingLevelDB for " + getName());
this.conf = config;
this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
this.ttlEnabled = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
this.rollingDBPath = new Path(
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
RollingLevelDBTimelineStore.FILENAME);
initFileSystem();
initRollingPeriod();
initHistoricalDBs();
}
protected void initFileSystem() throws IOException {
lfs = FileSystem.getLocal(conf);
boolean success = lfs.mkdirs(rollingDBPath,
RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK);
if (!success) {
throw new IOException("Failed to create leveldb root directory "
+ rollingDBPath);
}
}
protected synchronized void initRollingPeriod() {
final String lcRollingPeriod = conf.get(
YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD);
this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
.toUpperCase(Locale.ENGLISH));
fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
TimeZone.getTimeZone("GMT"));
sdf = new SimpleDateFormat(rollingPeriod.dateFormat());
sdf.setTimeZone(fdf.getTimeZone());
}
protected synchronized void initHistoricalDBs() throws IOException {
Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
for (FileStatus status : statuses) {
String dbName = FilenameUtils.getExtension(status.getPath().toString());
try {
Long dbStartTime = sdf.parse(dbName).getTime();
initRollingLevelDB(dbStartTime, status.getPath());
} catch (ParseException pe) {
LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
+ getName());
}
}
}
private void initRollingLevelDB(Long dbStartTime,
Path rollingInstanceDBPath) {
if (rollingdbs.containsKey(dbStartTime)) {
return;
}
Options options = new Options();
options.createIfMissing(true);
options.cacheSize(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
options.maxOpenFiles(conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
options.writeBufferSize(conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
+ " for start time: " + dbStartTime);
DB db = null;
try {
db = factory.open(
new File(rollingInstanceDBPath.toUri().getPath()), options);
rollingdbs.put(dbStartTime, db);
String dbName = fdf.format(dbStartTime);
LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
} catch (IOException ioe) {
LOG.warn("Failed to open rolling leveldb instance :"
+ new File(rollingInstanceDBPath.toUri().getPath()), ioe);
}
}
synchronized DB getPreviousDB(DB db) {
Iterator<DB> iterator = rollingdbs.values().iterator();
DB prev = null;
while (iterator.hasNext()) {
DB cur = iterator.next();
if (cur == db) {
break;
}
prev = cur;
}
return prev;
}
synchronized long getStartTimeFor(DB db) {
long startTime = -1;
for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) {
if (entry.getValue() == db) {
startTime = entry.getKey();
}
}
return startTime;
}
public synchronized DB getDBForStartTime(long startTime) {
// make sure we sanitize this input
startTime = Math.min(startTime, currentTimeMillis());
if (startTime >= getNextRollingTimeMillis()) {
roll(startTime);
}
Entry<Long, DB> entry = rollingdbs.floorEntry(startTime);
if (entry == null) {
return null;
}
return entry.getValue();
}
private void roll(long startTime) {
LOG.info("Rolling new DB instance for " + getName());
long currentStartTime = computeCurrentCheckMillis(startTime);
setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime));
String currentRollingDBInstance = fdf.format(currentStartTime);
String currentRollingDBName = getName() + "." + currentRollingDBInstance;
Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
if (getTimeToLiveEnabled()) {
scheduleOldDBsForEviction();
}
initRollingLevelDB(currentStartTime, currentRollingDBPath);
}
private synchronized void scheduleOldDBsForEviction() {
// keep at least time to live amount of data
long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
- getTimeToLive());
LOG.info("Scheduling " + getName() + " DBs older than "
+ fdf.format(evictionThreshold) + " for eviction");
Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, DB> entry = iterator.next();
// parse this in gmt time
if (entry.getKey() < evictionThreshold) {
LOG.info("Scheduling " + getName() + " eviction for "
+ fdf.format(entry.getKey()));
iterator.remove();
rollingdbsToEvict.put(entry.getKey(), entry.getValue());
}
}
}
public synchronized void evictOldDBs() {
LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet()
.iterator();
while (iterator.hasNext()) {
Entry<Long, DB> entry = iterator.next();
IOUtils.cleanupWithLogger(LOG, entry.getValue());
String dbName = fdf.format(entry.getKey());
Path path = new Path(rollingDBPath, getName() + "." + dbName);
try {
LOG.info("Removing old db directory contents in " + path);
lfs.delete(path, true);
} catch (IOException ioe) {
LOG.warn("Failed to evict old db " + path, ioe);
}
iterator.remove();
}
}
public void stop() throws Exception {
for (DB db : rollingdbs.values()) {
IOUtils.cleanupWithLogger(LOG, db);
}
IOUtils.cleanupWithLogger(LOG, lfs);
}
private long computeNextCheckMillis(long now) {
return computeCheckMillis(now, true);
}
public long computeCurrentCheckMillis(long now) {
return computeCheckMillis(now, false);
}
private synchronized long computeCheckMillis(long now, boolean next) {
// needs to be called synchronously due to shared Calendar
cal.setTimeInMillis(now);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
if (rollingPeriod == RollingPeriod.DAILY) {
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.DATE, 1);
}
} else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
// round down to 12 hour interval
int hour = (cal.get(Calendar.HOUR) / 12) * 12;
cal.set(Calendar.HOUR, hour);
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.HOUR_OF_DAY, 12);
}
} else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
// round down to 6 hour interval
int hour = (cal.get(Calendar.HOUR) / 6) * 6;
cal.set(Calendar.HOUR, hour);
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.HOUR_OF_DAY, 6);
}
} else if (rollingPeriod == RollingPeriod.HOURLY) {
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.HOUR_OF_DAY, 1);
}
} else if (rollingPeriod == RollingPeriod.MINUTELY) {
// round down to 5 minute interval
int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
cal.set(Calendar.MINUTE, minute);
if (next) {
cal.add(Calendar.MINUTE, 5);
}
}
return cal.getTimeInMillis();
}
}