| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.timeline; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.Calendar; |
| import java.util.GregorianCalendar; |
| import java.util.Iterator; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.TimeZone; |
| import java.util.TreeMap; |
| import java.util.Map.Entry; |
| |
| import org.apache.commons.io.FilenameUtils; |
| import org.apache.commons.lang.time.FastDateFormat; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.fusesource.leveldbjni.JniDBFactory; |
| import org.iq80.leveldb.DB; |
| import org.iq80.leveldb.Options; |
| import org.iq80.leveldb.WriteBatch; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Contains the logic to lookup a leveldb by timestamp so that multiple smaller |
| * databases can roll according to the configured period and evicted efficiently |
| * via operating system directory removal. |
| */ |
| class RollingLevelDB { |
| |
| /** Logger for this class. */ |
| private static final Logger LOG = |
| LoggerFactory.getLogger(RollingLevelDB.class); |
| /** Factory to open and create new leveldb instances. */ |
| private static JniDBFactory factory = new JniDBFactory(); |
| /** Thread safe date formatter. */ |
| private FastDateFormat fdf; |
| /** Date parser. */ |
| private SimpleDateFormat sdf; |
| /** Calendar to calculate the current and next rolling period. */ |
| private GregorianCalendar cal = new GregorianCalendar( |
| TimeZone.getTimeZone("GMT")); |
| /** Collection of all active rolling leveldb instances. */ |
| private final TreeMap<Long, DB> rollingdbs; |
| /** Collection of all rolling leveldb instances to evict. */ |
| private final TreeMap<Long, DB> rollingdbsToEvict; |
| /** Name of this rolling level db. */ |
| private final String name; |
| /** Calculated timestamp of when to roll a new leveldb instance. */ |
| private volatile long nextRollingCheckMillis = 0; |
| /** File system instance to find and create new leveldb instances. */ |
| private FileSystem lfs = null; |
| /** Directory to store rolling leveldb instances. */ |
| private Path rollingDBPath; |
| /** Configuration for this object. */ |
| private Configuration conf; |
| /** Rolling period. */ |
| private RollingPeriod rollingPeriod; |
| /** |
| * Rolling leveldb instances are evicted when their endtime is earlier than |
| * the current time minus the time to live value. |
| */ |
| private long ttl; |
| /** Whether time to live is enabled. */ |
| private boolean ttlEnabled; |
| |
| /** Encapsulates the rolling period to date format lookup. */ |
| enum RollingPeriod { |
| DAILY { |
| @Override |
| public String dateFormat() { |
| return "yyyy-MM-dd"; |
| } |
| }, |
| HALF_DAILY { |
| @Override |
| public String dateFormat() { |
| return "yyyy-MM-dd-HH"; |
| } |
| }, |
| QUARTER_DAILY { |
| @Override |
| public String dateFormat() { |
| return "yyyy-MM-dd-HH"; |
| } |
| }, |
| HOURLY { |
| @Override |
| public String dateFormat() { |
| return "yyyy-MM-dd-HH"; |
| } |
| }, |
| MINUTELY { |
| @Override |
| public String dateFormat() { |
| return "yyyy-MM-dd-HH-mm"; |
| } |
| }; |
| public abstract String dateFormat(); |
| } |
| |
| /** |
| * Convenience class for associating a write batch with its rolling leveldb |
| * instance. |
| */ |
| public static class RollingWriteBatch { |
| /** Leveldb object. */ |
| private final DB db; |
| /** Write batch for the db object. */ |
| private final WriteBatch writeBatch; |
| |
| public RollingWriteBatch(final DB db, final WriteBatch writeBatch) { |
| this.db = db; |
| this.writeBatch = writeBatch; |
| } |
| |
| public DB getDB() { |
| return db; |
| } |
| |
| public WriteBatch getWriteBatch() { |
| return writeBatch; |
| } |
| |
| public void write() { |
| db.write(writeBatch); |
| } |
| |
| public void close() { |
| IOUtils.cleanupWithLogger(LOG, writeBatch); |
| } |
| } |
| |
| RollingLevelDB(String name) { |
| this.name = name; |
| this.rollingdbs = new TreeMap<Long, DB>(); |
| this.rollingdbsToEvict = new TreeMap<Long, DB>(); |
| } |
| |
| protected String getName() { |
| return name; |
| } |
| |
| protected long currentTimeMillis() { |
| return System.currentTimeMillis(); |
| } |
| |
| public long getNextRollingTimeMillis() { |
| return nextRollingCheckMillis; |
| } |
| |
| public long getTimeToLive() { |
| return ttl; |
| } |
| |
| public boolean getTimeToLiveEnabled() { |
| return ttlEnabled; |
| } |
| |
| protected void setNextRollingTimeMillis(final long timestamp) { |
| this.nextRollingCheckMillis = timestamp; |
| LOG.info("Next rolling time for " + getName() + " is " |
| + fdf.format(nextRollingCheckMillis)); |
| } |
| |
| public void init(final Configuration config) throws Exception { |
| LOG.info("Initializing RollingLevelDB for " + getName()); |
| this.conf = config; |
| this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, |
| YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS); |
| this.ttlEnabled = conf.getBoolean( |
| YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true); |
| this.rollingDBPath = new Path( |
| conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), |
| RollingLevelDBTimelineStore.FILENAME); |
| initFileSystem(); |
| initRollingPeriod(); |
| initHistoricalDBs(); |
| } |
| |
| protected void initFileSystem() throws IOException { |
| lfs = FileSystem.getLocal(conf); |
| boolean success = lfs.mkdirs(rollingDBPath, |
| RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK); |
| if (!success) { |
| throw new IOException("Failed to create leveldb root directory " |
| + rollingDBPath); |
| } |
| } |
| |
| protected synchronized void initRollingPeriod() { |
| final String lcRollingPeriod = conf.get( |
| YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD, |
| YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD); |
| this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod |
| .toUpperCase(Locale.ENGLISH)); |
| fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(), |
| TimeZone.getTimeZone("GMT")); |
| sdf = new SimpleDateFormat(rollingPeriod.dateFormat()); |
| sdf.setTimeZone(fdf.getTimeZone()); |
| } |
| |
| protected synchronized void initHistoricalDBs() throws IOException { |
| Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*"); |
| FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath); |
| for (FileStatus status : statuses) { |
| String dbName = FilenameUtils.getExtension(status.getPath().toString()); |
| try { |
| Long dbStartTime = sdf.parse(dbName).getTime(); |
| initRollingLevelDB(dbStartTime, status.getPath()); |
| } catch (ParseException pe) { |
| LOG.warn("Failed to initialize rolling leveldb " + dbName + " for " |
| + getName()); |
| } |
| } |
| } |
| |
| private void initRollingLevelDB(Long dbStartTime, |
| Path rollingInstanceDBPath) { |
| if (rollingdbs.containsKey(dbStartTime)) { |
| return; |
| } |
| Options options = new Options(); |
| options.createIfMissing(true); |
| options.cacheSize(conf.getLong( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, |
| YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); |
| options.maxOpenFiles(conf.getInt( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, |
| YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES)); |
| options.writeBufferSize(conf.getInt( |
| YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, |
| YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); |
| LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath |
| + " for start time: " + dbStartTime); |
| DB db = null; |
| try { |
| db = factory.open( |
| new File(rollingInstanceDBPath.toUri().getPath()), options); |
| rollingdbs.put(dbStartTime, db); |
| String dbName = fdf.format(dbStartTime); |
| LOG.info("Added rolling leveldb instance " + dbName + " to " + getName()); |
| } catch (IOException ioe) { |
| LOG.warn("Failed to open rolling leveldb instance :" |
| + new File(rollingInstanceDBPath.toUri().getPath()), ioe); |
| } |
| } |
| |
| synchronized DB getPreviousDB(DB db) { |
| Iterator<DB> iterator = rollingdbs.values().iterator(); |
| DB prev = null; |
| while (iterator.hasNext()) { |
| DB cur = iterator.next(); |
| if (cur == db) { |
| break; |
| } |
| prev = cur; |
| } |
| return prev; |
| } |
| |
| synchronized long getStartTimeFor(DB db) { |
| long startTime = -1; |
| for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) { |
| if (entry.getValue() == db) { |
| startTime = entry.getKey(); |
| } |
| } |
| return startTime; |
| } |
| |
| public synchronized DB getDBForStartTime(long startTime) { |
| // make sure we sanitize this input |
| startTime = Math.min(startTime, currentTimeMillis()); |
| |
| if (startTime >= getNextRollingTimeMillis()) { |
| roll(startTime); |
| } |
| Entry<Long, DB> entry = rollingdbs.floorEntry(startTime); |
| if (entry == null) { |
| return null; |
| } |
| return entry.getValue(); |
| } |
| |
| private void roll(long startTime) { |
| LOG.info("Rolling new DB instance for " + getName()); |
| long currentStartTime = computeCurrentCheckMillis(startTime); |
| setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime)); |
| String currentRollingDBInstance = fdf.format(currentStartTime); |
| String currentRollingDBName = getName() + "." + currentRollingDBInstance; |
| Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName); |
| if (getTimeToLiveEnabled()) { |
| scheduleOldDBsForEviction(); |
| } |
| initRollingLevelDB(currentStartTime, currentRollingDBPath); |
| } |
| |
| private synchronized void scheduleOldDBsForEviction() { |
| // keep at least time to live amount of data |
| long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis() |
| - getTimeToLive()); |
| |
| LOG.info("Scheduling " + getName() + " DBs older than " |
| + fdf.format(evictionThreshold) + " for eviction"); |
| Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator(); |
| while (iterator.hasNext()) { |
| Entry<Long, DB> entry = iterator.next(); |
| // parse this in gmt time |
| if (entry.getKey() < evictionThreshold) { |
| LOG.info("Scheduling " + getName() + " eviction for " |
| + fdf.format(entry.getKey())); |
| iterator.remove(); |
| rollingdbsToEvict.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| |
| public synchronized void evictOldDBs() { |
| LOG.info("Evicting " + getName() + " DBs scheduled for eviction"); |
| Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet() |
| .iterator(); |
| while (iterator.hasNext()) { |
| Entry<Long, DB> entry = iterator.next(); |
| IOUtils.cleanupWithLogger(LOG, entry.getValue()); |
| String dbName = fdf.format(entry.getKey()); |
| Path path = new Path(rollingDBPath, getName() + "." + dbName); |
| try { |
| LOG.info("Removing old db directory contents in " + path); |
| lfs.delete(path, true); |
| } catch (IOException ioe) { |
| LOG.warn("Failed to evict old db " + path, ioe); |
| } |
| iterator.remove(); |
| } |
| } |
| |
| public void stop() throws Exception { |
| for (DB db : rollingdbs.values()) { |
| IOUtils.cleanupWithLogger(LOG, db); |
| } |
| IOUtils.cleanupWithLogger(LOG, lfs); |
| } |
| |
| private long computeNextCheckMillis(long now) { |
| return computeCheckMillis(now, true); |
| } |
| |
| public long computeCurrentCheckMillis(long now) { |
| return computeCheckMillis(now, false); |
| } |
| |
| private synchronized long computeCheckMillis(long now, boolean next) { |
| // needs to be called synchronously due to shared Calendar |
| cal.setTimeInMillis(now); |
| cal.set(Calendar.SECOND, 0); |
| cal.set(Calendar.MILLISECOND, 0); |
| |
| if (rollingPeriod == RollingPeriod.DAILY) { |
| cal.set(Calendar.HOUR_OF_DAY, 0); |
| cal.set(Calendar.MINUTE, 0); |
| if (next) { |
| cal.add(Calendar.DATE, 1); |
| } |
| } else if (rollingPeriod == RollingPeriod.HALF_DAILY) { |
| // round down to 12 hour interval |
| int hour = (cal.get(Calendar.HOUR) / 12) * 12; |
| cal.set(Calendar.HOUR, hour); |
| cal.set(Calendar.MINUTE, 0); |
| if (next) { |
| cal.add(Calendar.HOUR_OF_DAY, 12); |
| } |
| } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) { |
| // round down to 6 hour interval |
| int hour = (cal.get(Calendar.HOUR) / 6) * 6; |
| cal.set(Calendar.HOUR, hour); |
| cal.set(Calendar.MINUTE, 0); |
| if (next) { |
| cal.add(Calendar.HOUR_OF_DAY, 6); |
| } |
| } else if (rollingPeriod == RollingPeriod.HOURLY) { |
| cal.set(Calendar.MINUTE, 0); |
| if (next) { |
| cal.add(Calendar.HOUR_OF_DAY, 1); |
| } |
| } else if (rollingPeriod == RollingPeriod.MINUTELY) { |
| // round down to 5 minute interval |
| int minute = (cal.get(Calendar.MINUTE) / 5) * 5; |
| cal.set(Calendar.MINUTE, minute); |
| if (next) { |
| cal.add(Calendar.MINUTE, 5); |
| } |
| } |
| return cal.getTimeInMillis(); |
| } |
| } |