blob: af11fc7df4dd13e167bf63eccbfe5dd5b21f4a0d [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.MultipleIOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
* files that are no longer being written to, into new files, one per region for
* region to replay on startup. Delete the old log files when finished.
*/
public class HLogSplitter {
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
/**
* Name of file that holds recovered edits written by the wal log splitting
* code, one per region
*/
public static final String RECOVERED_EDITS = "recovered.edits";
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
private boolean hasSplit = false;
private long splitTime = 0;
private long splitSize = 0;
// Parameters for split process
protected final Path rootDir;
protected final Path srcDir;
protected final Path oldLogDir;
protected final FileSystem fs;
protected final Configuration conf;
// Major subcomponents of the split process.
// These are separated into inner classes to make testing easier.
OutputSink outputSink;
EntryBuffers entryBuffers;
// If an exception is thrown by one of the other threads, it will be
// stored here.
protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
// Wait/notify for when data has been produced by the reader thread,
// consumed by the reader thread, or an exception occurred
Object dataAvailable = new Object();
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
* <code>hbase.hlog.splitter.impl</code> property to derived the instance
* class to use.
*
* @param conf
* @param rootDir hbase directory
* @param srcDir logs directory
* @param oldLogDir directory where processed logs are archived to
* @param fs FileSystem
*/
public static HLogSplitter createLogSplitter(Configuration conf,
final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs) {
@SuppressWarnings("unchecked")
Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
.getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
try {
Constructor<? extends HLogSplitter> constructor =
splitterClass.getConstructor(
Configuration.class, // conf
Path.class, // rootDir
Path.class, // srcDir
Path.class, // oldLogDir
FileSystem.class); // fs
return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (SecurityException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
Path oldLogDir, FileSystem fs) {
this.conf = conf;
this.rootDir = rootDir;
this.srcDir = srcDir;
this.oldLogDir = oldLogDir;
this.fs = fs;
entryBuffers = new EntryBuffers(
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
128*1024*1024));
outputSink = new OutputSink();
}
/**
* Split up a bunch of regionserver commit log files that are no longer being
* written to, into new files, one per region for region to replay on startup.
* Delete the old log files when finished.
*
* @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
public List<Path> splitLog()
throws IOException {
Preconditions.checkState(!hasSplit,
"An HLogSplitter instance may only be used once");
hasSplit = true;
long startTime = System.currentTimeMillis();
List<Path> splits = null;
if (!fs.exists(srcDir)) {
// Nothing to do
return splits;
}
FileStatus[] logfiles = fs.listStatus(srcDir);
if (logfiles == null || logfiles.length == 0) {
// Nothing to do
return splits;
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
splits = splitLog(logfiles);
splitTime = System.currentTimeMillis() - startTime;
LOG.info("hlog file splitting completed in " + splitTime +
" ms for " + srcDir.toString());
return splits;
}
/**
* @return time that this split took
*/
public long getTime() {
return this.splitTime;
}
/**
* @return aggregate size of hlogs that were split
*/
public long getSize() {
return this.splitSize;
}
/**
* @return a map from encoded region ID to the number of edits written out
* for that region.
*/
Map<byte[], Long> getOutputCounts() {
Preconditions.checkState(hasSplit);
return outputSink.getOutputCounts();
}
/**
* Splits the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories,
* in batches of (hbase.hlog.split.batch.size)
*
* This process is split into multiple threads. In the main thread, we loop
* through the logs to be split. For each log, we:
* <ul>
* <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
* <li> Read each edit (see {@link #parseHLog}</li>
* <li> Mark as "processed" or "corrupt" depending on outcome</li>
* </ul>
*
* Each edit is passed into the EntryBuffers instance, which takes care of
* memory accounting and splitting the edits by region.
*
* The OutputSink object then manages N other WriterThreads which pull chunks
* of edits from EntryBuffers and write them to the output region directories.
*
* After the process is complete, the log files are archived to a separate
* directory.
*/
private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
List<Path> splits = null;
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
splitSize = 0;
outputSink.startWriterThreads(entryBuffers);
try {
int i = 0;
for (FileStatus log : logfiles) {
Path logPath = log.getPath();
long logLength = log.getLen();
splitSize += logLength;
LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
+ ": " + logPath + ", length=" + logLength);
try {
recoverFileLease(fs, logPath, conf);
parseHLog(log, entryBuffers, fs, conf);
processedLogs.add(logPath);
} catch (EOFException eof) {
// truncated files are expected if a RS crashes (see HBASE-2643)
LOG.info("EOF from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
} catch (IOException e) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (e.getCause() instanceof ParseException) {
LOG.warn("ParseException from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
} else {
if (skipErrors) {
LOG.info("Got while parsing hlog " + logPath +
". Marking as corrupted", e);
corruptedLogs.add(logPath);
} else {
throw e;
}
}
}
}
if (fs.listStatus(srcDir).length > processedLogs.size()
+ corruptedLogs.size()) {
throw new OrphanHLogAfterSplitException(
"Discovered orphan hlog after split. Maybe the "
+ "HRegionServer was not dead when we started");
}
archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
splits = outputSink.finishWritingAndClose();
}
return splits;
}
/**
* Moves processed logs to a oldLogDir after successful processing Moves
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
* (.corrupt) for later investigation
*
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
* @param fs
* @param conf
* @throws IOException
*/
private static void archiveLogs(
final Path srcDir,
final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
if (!fs.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir " + corruptDir);
}
fs.mkdirs(oldLogDir);
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
if (!fs.rename(corrupted, p)) {
LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
} else {
LOG.info("Moving corrupted log " + corrupted + " to " + p);
}
}
for (Path p : processedLogs) {
Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
if (!fs.rename(p, newPath)) {
LOG.info("Unable to move " + p + " to " + newPath);
} else {
LOG.info("Archived processed log " + p + " to " + newPath);
}
}
if (!fs.delete(srcDir, true)) {
throw new IOException("Unable to delete src dir: " + srcDir);
}
}
/**
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in
* <code>logEntry</code> named for the sequenceid in the passed
* <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
* This method also ensures existence of RECOVERED_EDITS_DIR under the region
* creating it if necessary.
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir) throws IOException {
Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
.getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
Bytes.toString(logEntry.getKey().getEncodedRegionName()));
if (!fs.exists(regiondir)) {
LOG.info("This region's directory doesn't exist: "
+ regiondir.toString() + ". It is very likely that it was" +
" already split so it's safe to discard those edits.");
return null;
}
Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
.getLogSeqNum()));
}
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
/*
* Parse a single hlog and put the edits in @splitLogsMap
*
* @param logfile to split
* @param splitLogsMap output parameter: a map with region names as keys and a
* list of edits as values
* @param fs the filesystem
* @param conf the configuration
* @throws IOException if hlog is corrupted, or can't be open
*/
private void parseHLog(final FileStatus logfile,
EntryBuffers entryBuffers, final FileSystem fs,
final Configuration conf)
throws IOException {
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed.
long length = logfile.getLen();
if (length <= 0) {
LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
}
Path path = logfile.getPath();
Reader in;
int editsCount = 0;
try {
in = getReader(fs, path, conf);
} catch (EOFException e) {
if (length <= 0) {
//TODO should we ignore an empty, not-last log file if skip.errors is false?
//Either way, the caller should decide what to do. E.g. ignore if this is the last
//log in sequence.
//TODO is this scenario still possible if the log has been recovered (i.e. closed)
LOG.warn("Could not open " + path + " for reading. File is empty" + e);
return;
} else {
throw e;
}
}
try {
Entry entry;
while ((entry = in.next()) != null) {
entryBuffers.appendEntry(entry);
editsCount++;
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
LOG.warn("Close log reader in finally threw exception -- continuing",
e);
}
}
}
private void writerThreadError(Throwable t) {
thrown.compareAndSet(null, t);
}
/**
* Check for errors in the writer threads. If any is found, rethrow it.
*/
private void checkForErrors() throws IOException {
Throwable thrown = this.thrown.get();
if (thrown == null) return;
if (thrown instanceof IOException) {
throw (IOException)thrown;
} else {
throw new RuntimeException(thrown);
}
}
/**
* Create a new {@link Writer} for writing log splits.
*/
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
return HLog.createWriter(fs, logfile, conf);
}
/**
* Create a new {@link Reader} for reading logs to split.
*/
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
throws IOException {
return HLog.getReader(fs, curLogFile, conf);
}
/**
* Class which accumulates edits and separates them into a buffer per region
* while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
* a predefined threshold.
*
* Writer threads then pull region-specific buffers from this class.
*/
class EntryBuffers {
Map<byte[], RegionEntryBuffer> buffers =
new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
/* Track which regions are currently in the middle of writing. We don't allow
an IO thread to pick up bytes from a region if we're already writing
data for that region in a different IO thread. */
Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
long totalBuffered = 0;
long maxHeapUsage;
EntryBuffers(long maxHeapUsage) {
this.maxHeapUsage = maxHeapUsage;
}
/**
* Append a log entry into the corresponding region buffer.
* Blocks if the total heap usage has crossed the specified threshold.
*
* @throws InterruptedException
* @throws IOException
*/
void appendEntry(Entry entry) throws InterruptedException, IOException {
HLogKey key = entry.getKey();
RegionEntryBuffer buffer;
synchronized (this) {
buffer = buffers.get(key.getEncodedRegionName());
if (buffer == null) {
buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
buffers.put(key.getEncodedRegionName(), buffer);
}
long incrHeap = buffer.appendEntry(entry);
totalBuffered += incrHeap;
}
// If we crossed the chunk threshold, wait for more space to be available
synchronized (dataAvailable) {
while (totalBuffered > maxHeapUsage && thrown == null) {
LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
dataAvailable.wait(3000);
}
dataAvailable.notifyAll();
}
checkForErrors();
}
synchronized RegionEntryBuffer getChunkToWrite() {
long biggestSize=0;
byte[] biggestBufferKey=null;
for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
long size = entry.getValue().heapSize();
if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
biggestSize = size;
biggestBufferKey = entry.getKey();
}
}
if (biggestBufferKey == null) {
return null;
}
RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
currentlyWriting.add(biggestBufferKey);
return buffer;
}
void doneWriting(RegionEntryBuffer buffer) {
synchronized (this) {
boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
assert removed;
}
long size = buffer.heapSize();
synchronized (dataAvailable) {
totalBuffered -= size;
// We may unblock writers
dataAvailable.notifyAll();
}
}
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
return currentlyWriting.contains(region);
}
}
/**
* A buffer of some number of edits for a given region.
* This accumulates edits and also provides a memory optimization in order to
* share a single byte array instance for the table and region name.
* Also tracks memory usage of the accumulated edits.
*/
static class RegionEntryBuffer implements HeapSize {
long heapInBuffer = 0;
List<Entry> entryBuffer;
byte[] tableName;
byte[] encodedRegionName;
RegionEntryBuffer(byte[] table, byte[] region) {
this.tableName = table;
this.encodedRegionName = region;
this.entryBuffer = new LinkedList<Entry>();
}
long appendEntry(Entry entry) {
internify(entry);
entryBuffer.add(entry);
long incrHeap = entry.getEdit().heapSize() +
ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
0; // TODO linkedlist entry
heapInBuffer += incrHeap;
return incrHeap;
}
private void internify(Entry entry) {
HLogKey k = entry.getKey();
k.internTableName(this.tableName);
k.internEncodedRegionName(this.encodedRegionName);
}
public long heapSize() {
return heapInBuffer;
}
}
class WriterThread extends Thread {
private volatile boolean shouldStop = false;
WriterThread(int i) {
super("WriterThread-" + i);
}
public void run() {
try {
doRun();
} catch (Throwable t) {
LOG.error("Error in log splitting write thread", t);
writerThreadError(t);
}
}
private void doRun() throws IOException {
LOG.debug("Writer thread " + this + ": starting");
while (true) {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) {
// No data currently available, wait on some more to show up
synchronized (dataAvailable) {
if (shouldStop) return;
try {
dataAvailable.wait(1000);
} catch (InterruptedException ie) {
if (!shouldStop) {
throw new RuntimeException(ie);
}
}
}
continue;
}
assert buffer != null;
try {
writeBuffer(buffer);
} finally {
entryBuffers.doneWriting(buffer);
}
}
}
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn(this.getName() + " got an empty buffer, skipping");
return;
}
WriterAndPath wap = null;
long startTime = System.nanoTime();
try {
int editsCount = 0;
for (Entry logEntry : entries) {
if (wap == null) {
wap = outputSink.getWriterAndPath(logEntry);
if (wap == null) {
// getWriterAndPath decided we don't need to write these edits
// Message was already logged
return;
}
}
wap.w.append(logEntry);
editsCount++;
}
// Pass along summary statistics
wap.incrementEdits(editsCount);
wap.incrementNanoTime(System.nanoTime() - startTime);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.fatal(this.getName() + " Got while writing log entry to log", e);
throw e;
}
}
void finish() {
shouldStop = true;
}
}
/**
* Class that manages the output streams from the log splitting process.
*/
class OutputSink {
private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
private final List<WriterThread> writerThreads = Lists.newArrayList();
/* Set of regions which we've decided should not output edits */
private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
private boolean hasClosed = false;
/**
* Start the threads that will pump data from the entryBuffers
* to the output files.
* @return the list of started threads
*/
synchronized void startWriterThreads(EntryBuffers entryBuffers) {
// More threads could potentially write faster at the expense
// of causing more disk seeks as the logs are split.
// 3. After a certain setting (probably around 3) the
// process will be bound on the reader in the current
// implementation anyway.
int numThreads = conf.getInt(
"hbase.regionserver.hlog.splitlog.writer.threads", 3);
for (int i = 0; i < numThreads; i++) {
WriterThread t = new WriterThread(i);
t.start();
writerThreads.add(t);
}
}
List<Path> finishWritingAndClose() throws IOException {
LOG.info("Waiting for split writer threads to finish");
for (WriterThread t : writerThreads) {
t.finish();
}
for (WriterThread t: writerThreads) {
try {
t.join();
} catch (InterruptedException ie) {
throw new IOException(ie);
}
checkForErrors();
}
LOG.info("Split writers finished");
return closeStreams();
}
/**
* Close all of the output streams.
* @return the list of paths written.
*/
private List<Path> closeStreams() throws IOException {
Preconditions.checkState(!hasClosed);
List<Path> paths = new ArrayList<Path>();
List<IOException> thrown = Lists.newArrayList();
for (WriterAndPath wap : logWriters.values()) {
try {
wap.w.close();
} catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe);
thrown.add(ioe);
continue;
}
paths.add(wap.p);
LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
+ (wap.nanosSpent / 1000/ 1000) + "ms)");
}
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
hasClosed = true;
return paths;
}
/**
* Get a writer and path for a log starting at the given entry.
*
* This function is threadsafe so long as multiple threads are always
* acting on different regions.
*
* @return null if this region shouldn't output any logs
*/
WriterAndPath getWriterAndPath(Entry entry) throws IOException {
byte region[] = entry.getKey().getEncodedRegionName();
WriterAndPath ret = logWriters.get(region);
if (ret != null) {
return ret;
}
// If we already decided that this region doesn't get any output
// we don't need to check again.
if (blacklistedRegions.contains(region)) {
return null;
}
// Need to create writer
Path regionedits = getRegionSplitEditsPath(fs,
entry, rootDir);
if (regionedits == null) {
// Edits dir doesn't exist
blacklistedRegions.add(region);
return null;
}
deletePreexistingOldEdits(regionedits);
Writer w = createWriter(fs, regionedits, conf);
ret = new WriterAndPath(regionedits, w);
logWriters.put(region, ret);
LOG.debug("Creating writer path=" + regionedits + " region="
+ Bytes.toStringBinary(region));
return ret;
}
/**
* If the specified path exists, issue a warning and delete it.
*/
private void deletePreexistingOldEdits(Path regionedits) throws IOException {
if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting "
+ regionedits + ", length="
+ fs.getFileStatus(regionedits).getLen());
if (!fs.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits);
}
}
}
/**
* @return a map from encoded region ID to the number of edits written out
* for that region.
*/
private Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
Bytes.BYTES_COMPARATOR);
synchronized (logWriters) {
for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
ret.put(entry.getKey(), entry.getValue().editsWritten);
}
}
return ret;
}
}
/**
* Private data structure that wraps a Writer and its Path,
* also collecting statistics about the data written to this
* output.
*/
private final static class WriterAndPath {
final Path p;
final Writer w;
/* Count of edits written to this path */
long editsWritten = 0;
/* Number of nanos spent writing to this log */
long nanosSpent = 0;
WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
void incrementEdits(int edits) {
editsWritten += edits;
}
void incrementNanoTime(long nanos) {
nanosSpent += nanos;
}
}
}