HBASE-23298 Refactor LogRecoveredEditsOutputSink and BoundedLogWriterCreationOutputSink (#832)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
new file mode 100644
index 0000000..da952eb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
+
+@InterfaceAudience.Private
+abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
+ private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
+ private final WALSplitter walSplitter;
+ private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>();
+
+ public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
+ WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+ super(controller, entryBuffers, numWriters);
+ this.walSplitter = walSplitter;
+ }
+
+ /**
+ * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
+ */
+ protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
+ long seqId) throws IOException {
+ Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
+ walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
+ walSplitter.conf);
+ if (walSplitter.walFS.exists(regionEditsPath)) {
+ LOG.warn("Found old edits file. It could be the " +
+ "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" +
+ walSplitter.walFS.getFileStatus(regionEditsPath).getLen());
+ if (!walSplitter.walFS.delete(regionEditsPath, false)) {
+ LOG.warn("Failed delete of old {}", regionEditsPath);
+ }
+ }
+ WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
+ LOG.info("Creating recovered edits writer path={}", regionEditsPath);
+ return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
+ }
+
+ protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
+ List<IOException> thrown) throws IOException {
+ try {
+ editsWriter.writer.close();
+ } catch (IOException ioe) {
+ LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe);
+ thrown.add(ioe);
+ return null;
+ }
+ LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms",
+ editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped,
+ editsWriter.nanosSpent / 1000 / 1000);
+ if (editsWriter.editsWritten == 0) {
+ // just remove the empty recovered.edits file
+ if (walSplitter.walFS.exists(editsWriter.path) &&
+ !walSplitter.walFS.delete(editsWriter.path, false)) {
+ LOG.warn("Failed deleting empty {}", editsWriter.path);
+ throw new IOException("Failed deleting empty " + editsWriter.path);
+ }
+ return null;
+ }
+
+ Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
+ regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
+ try {
+ if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
+ deleteOneWithFewerEntries(editsWriter, dst);
+ }
+ // Skip the unit tests which create a splitter that reads and
+ // writes the data without touching disk.
+ // TestHLogSplit#testThreading is an example.
+ if (walSplitter.walFS.exists(editsWriter.path)) {
+ if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
+ throw new IOException(
+ "Failed renaming recovered edits " + editsWriter.path + " to " + dst);
+ }
+ LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe);
+ thrown.add(ioe);
+ return null;
+ }
+ return dst;
+ }
+
+ @Override
+ public boolean keepRegionEvent(WAL.Entry entry) {
+ ArrayList<Cell> cells = entry.getEdit().getCells();
+ for (Cell cell : cells) {
+ if (WALEdit.isCompactionMarker(cell)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Update region's maximum edit log SeqNum.
+ */
+ void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
+ synchronized (regionMaximumEditLogSeqNum) {
+ String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+ Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
+ if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
+ regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
+ }
+ }
+ }
+
+ // delete the one with fewer wal entries
+ private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
+ throws IOException {
+ long dstMinLogSeqNum = -1L;
+ try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+ WAL.Entry entry = reader.next();
+ if (entry != null) {
+ dstMinLogSeqNum = entry.getKey().getSequenceId();
+ }
+ } catch (EOFException e) {
+ LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
+ e);
+ }
+ if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
+ LOG.warn("Found existing old edits file. It could be the result of a previous failed" +
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" +
+ walSplitter.walFS.getFileStatus(dst).getLen());
+ if (!walSplitter.walFS.delete(dst, false)) {
+ LOG.warn("Failed deleting of old {}", dst);
+ throw new IOException("Failed deleting of old " + dst);
+ }
+ } else {
+ LOG.warn(
+ "Found existing old edits file and we have less entries. Deleting " + editsWriter.path +
+ ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
+ if (!walSplitter.walFS.delete(editsWriter.path, false)) {
+ LOG.warn("Failed deleting of {}", editsWriter.path);
+ throw new IOException("Failed deleting of " + editsWriter.path);
+ }
+ }
+ }
+
+ /**
+ * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
+ * statistics about the data written to this output.
+ */
+ final class RecoveredEditsWriter {
+ /* Count of edits written to this path */
+ long editsWritten = 0;
+ /* Count of edits skipped to this path */
+ long editsSkipped = 0;
+ /* Number of nanos spent writing to this log */
+ long nanosSpent = 0;
+
+ final byte[] encodedRegionName;
+ final Path path;
+ final WALProvider.Writer writer;
+ final long minLogSeqNum;
+
+ RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer,
+ long minLogSeqNum) {
+ this.encodedRegionName = encodedRegionName;
+ this.path = path;
+ this.writer = writer;
+ this.minLogSeqNum = minLogSeqNum;
+ }
+
+ private void incrementEdits(int edits) {
+ editsWritten += edits;
+ }
+
+ private void incrementSkippedEdits(int skipped) {
+ editsSkipped += skipped;
+ totalSkippedEdits.addAndGet(skipped);
+ }
+
+ private void incrementNanoTime(long nanos) {
+ nanosSpent += nanos;
+ }
+
+ void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
+ long startTime = System.nanoTime();
+ try {
+ int editsCount = 0;
+ for (WAL.Entry logEntry : entries) {
+ filterCellByStore(logEntry);
+ if (!logEntry.getEdit().isEmpty()) {
+ writer.append(logEntry);
+ updateRegionMaximumEditLogSeqNum(logEntry);
+ editsCount++;
+ } else {
+ incrementSkippedEdits(1);
+ }
+ }
+ // Pass along summary statistics
+ incrementEdits(editsCount);
+ incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
+ LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
+ throw e;
+ }
+ }
+
+ private void filterCellByStore(WAL.Entry logEntry) {
+ Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
+ .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+ if (MapUtils.isEmpty(maxSeqIdInStores)) {
+ return;
+ }
+ // Create the array list for the cells that aren't filtered.
+ // We make the assumption that most cells will be kept.
+ ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
+ for (Cell cell : logEntry.getEdit().getCells()) {
+ if (WALEdit.isMetaEditFamily(cell)) {
+ keptCells.add(cell);
+ } else {
+ byte[] family = CellUtil.cloneFamily(cell);
+ Long maxSeqId = maxSeqIdInStores.get(family);
+ // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
+ // or the master was crashed before and we can not get the information.
+ if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
+ keptCells.add(cell);
+ }
+ }
+ }
+
+ // Anything in the keptCells array list is still live.
+ // So rather than removing the cells from the array list
+ // which would be an O(n^2) operation, we just replace the list
+ logEntry.getEdit().setCells(keptCells);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java
new file mode 100644
index 0000000..ed3c8b7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used for {@link BoundedRecoveredEditsOutputSink}. The core part of limiting opening writers is it
+ * doesn't return chunk only if the heap size is over maxHeapUsage. Thus it doesn't need to create
+ * a writer for each region during splitting. The returned {@link EntryBuffers.RegionEntryBuffer}
+ * will be write to recovered edits file and close the writer immediately.
+ * See {@link BoundedRecoveredEditsOutputSink#append(EntryBuffers.RegionEntryBuffer)} for more
+ * details.
+ */
+@InterfaceAudience.Private
+public class BoundedEntryBuffers extends EntryBuffers {
+
+ public BoundedEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) {
+ super(controller, maxHeapUsage);
+ }
+
+ @Override
+ synchronized RegionEntryBuffer getChunkToWrite() {
+ if (totalBuffered < maxHeapUsage) {
+ return null;
+ }
+ return super.getChunkToWrite();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
deleted file mode 100644
index 77b8f93..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class that manages the output streams from the log splitting process.
- * Bounded means the output streams will be no more than the size of threadpool
- */
-@InterfaceAudience.Private
-public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
- private static final Logger LOG =
- LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
-
- private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
-
- public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
- WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
- super(walSplitter, controller, entryBuffers, numWriters);
- }
-
- @Override
- public List<Path> finishWritingAndClose() throws IOException {
- boolean isSuccessful;
- List<Path> result;
- try {
- isSuccessful = finishWriting(false);
- } finally {
- result = close();
- }
- if (isSuccessful) {
- splits = result;
- }
- return splits;
- }
-
- @Override
- boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
- List<Path> paths) throws InterruptedException, ExecutionException {
- for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
- .entrySet()) {
- LOG.info("Submitting writeThenClose of {}",
- Bytes.toString(buffer.getValue().encodedRegionName));
- completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Path dst = writeThenClose(buffer.getValue());
- paths.add(dst);
- return null;
- }
- });
- }
- boolean progress_failed = false;
- for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
- Future<Void> future = completionService.take();
- future.get();
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- }
-
- return progress_failed;
- }
-
- /**
- * since the splitting process may create multiple output files, we need a map
- * regionRecoverStatMap to track the output count of each region.
- * @return a map from encoded region ID to the number of edits written out for that region.
- */
- @Override
- public Map<byte[], Long> getOutputCounts() {
- Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
- for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
- regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
- }
- return regionRecoverStatMapResult;
- }
-
- /**
- * @return the number of recovered regions
- */
- @Override
- public int getNumberOfRecoveredRegions() {
- return regionRecoverStatMap.size();
- }
-
- /**
- * Append the buffer to a new recovered edits file, then close it after all done
- * @param buffer contain all entries of a certain region
- * @throws IOException when closeWriter failed
- */
- @Override
- public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
- writeThenClose(buffer);
- }
-
- private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
- WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
- if (wap != null) {
- String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
- Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
- if (value != null) {
- Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
- regionRecoverStatMap.put(encodedRegionName, newValue);
- }
- }
-
- Path dst = null;
- List<IOException> thrown = new ArrayList<>();
- if (wap != null) {
- dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
- }
- if (!thrown.isEmpty()) {
- throw MultipleIOException.createIOException(thrown);
- }
- return dst;
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
new file mode 100644
index 0000000..795192b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ * Every region may have many recovered edits file. But the opening writers is bounded.
+ * Bounded means the output streams will be no more than the size of threadpool.
+ */
+@InterfaceAudience.Private
+class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BoundedRecoveredEditsOutputSink.class);
+
+ // Since the splitting process may create multiple output files, we need a map
+ // to track the output count of each region.
+ private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
+ // Need a counter to track the opening writers.
+ private final AtomicInteger openingWritersNum = new AtomicInteger(0);
+
+ public BoundedRecoveredEditsOutputSink(WALSplitter walSplitter,
+ WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+ super(walSplitter, controller, entryBuffers, numWriters);
+ }
+
+ @Override
+ public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
+ List<WAL.Entry> entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return;
+ }
+ // The key point is create a new writer, write edits then close writer.
+ RecoveredEditsWriter writer =
+ createRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
+ entries.get(0).getKey().getSequenceId());
+ if (writer != null) {
+ openingWritersNum.incrementAndGet();
+ writer.writeRegionEntries(entries);
+ regionEditsWrittenMap.compute(buffer.encodedRegionName,
+ (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
+ List<IOException> thrown = new ArrayList<>();
+ Path dst = closeRecoveredEditsWriter(writer, thrown);
+ splits.add(dst);
+ openingWritersNum.decrementAndGet();
+ if (!thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ }
+ }
+
+ @Override
+ public List<Path> close() throws IOException {
+ boolean isSuccessful = true;
+ try {
+ isSuccessful &= finishWriterThreads();
+ } finally {
+ isSuccessful &= writeRemainingEntryBuffers();
+ }
+ return isSuccessful ? splits : null;
+ }
+
+ /**
+ * Write out the remaining RegionEntryBuffers and close the writers.
+ *
+ * @return true when there is no error.
+ */
+ private boolean writeRemainingEntryBuffers() throws IOException {
+ for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) {
+ closeCompletionService.submit(() -> {
+ append(buffer);
+ return null;
+ });
+ }
+ boolean progressFailed = false;
+ try {
+ for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+ Future<Void> future = closeCompletionService.take();
+ future.get();
+ if (!progressFailed && reporter != null && !reporter.progress()) {
+ progressFailed = true;
+ }
+ }
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ closeThreadPool.shutdownNow();
+ }
+ return !progressFailed;
+ }
+
+ @Override
+ public Map<byte[], Long> getOutputCounts() {
+ return regionEditsWrittenMap;
+ }
+
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return regionEditsWrittenMap.size();
+ }
+
+ @Override
+ int getNumOpenWriters() {
+ return openingWritersNum.get();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
index f0974be..6348e5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
@@ -18,58 +18,56 @@
package org.apache.hadoop.hbase.wal;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.yetus.audience.InterfaceAudience;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
/**
* Class which accumulates edits and separates them into a buffer per region while simultaneously
* accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
* pull region-specific buffers from this class.
*/
@InterfaceAudience.Private
-public class EntryBuffers {
+class EntryBuffers {
private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
- PipelineController controller;
+ private final PipelineController controller;
- Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
/*
* Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
* up bytes from a region if we're already writing data for that region in a different IO thread.
*/
- Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- long totalBuffered = 0;
- long maxHeapUsage;
- boolean splitWriterCreationBounded;
+ protected long totalBuffered = 0;
+ protected final long maxHeapUsage;
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
- this(controller, maxHeapUsage, false);
- }
-
- public EntryBuffers(PipelineController controller, long maxHeapUsage,
- boolean splitWriterCreationBounded) {
this.controller = controller;
this.maxHeapUsage = maxHeapUsage;
- this.splitWriterCreationBounded = splitWriterCreationBounded;
}
/**
* Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
* crossed the specified threshold.
*/
- public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
+ void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
WALKey key = entry.getKey();
RegionEntryBuffer buffer;
long incrHeap;
@@ -98,13 +96,6 @@
* @return RegionEntryBuffer a buffer of edits to be written.
*/
synchronized RegionEntryBuffer getChunkToWrite() {
- // The core part of limiting opening writers is it doesn't return chunk only if the
- // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
- // region during splitting. It will flush all the logs in the buffer after splitting
- // through a threadpool, which means the number of writers it created is under control.
- if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
- return null;
- }
long biggestSize = 0;
byte[] biggestBufferKey = null;
@@ -138,21 +129,56 @@
}
}
+ @VisibleForTesting
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
return currentlyWriting.contains(region);
}
- public void waitUntilDrained() {
- synchronized (controller.dataAvailable) {
- while (totalBuffered > 0) {
- try {
- controller.dataAvailable.wait(2000);
- } catch (InterruptedException e) {
- LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
- Thread.interrupted();
- break;
- }
- }
+ /**
+ * A buffer of some number of edits for a given region.
+ * This accumulates edits and also provides a memory optimization in order to
+ * share a single byte array instance for the table and region name.
+ * Also tracks memory usage of the accumulated edits.
+ */
+ static class RegionEntryBuffer implements HeapSize {
+ private long heapInBuffer = 0;
+ final List<WAL.Entry> entryBuffer;
+ final TableName tableName;
+ final byte[] encodedRegionName;
+
+ RegionEntryBuffer(TableName tableName, byte[] region) {
+ this.tableName = tableName;
+ this.encodedRegionName = region;
+ this.entryBuffer = new ArrayList<>();
+ }
+
+ long appendEntry(WAL.Entry entry) {
+ internify(entry);
+ entryBuffer.add(entry);
+ // TODO linkedlist entry
+ long incrHeap = entry.getEdit().heapSize() +
+ ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers
+ heapInBuffer += incrHeap;
+ return incrHeap;
+ }
+
+ private void internify(WAL.Entry entry) {
+ WALKeyImpl k = entry.getKey();
+ k.internTableName(this.tableName);
+ k.internEncodedRegionName(this.encodedRegionName);
+ }
+
+ @Override
+ public long heapSize() {
+ return heapInBuffer;
+ }
+
+ public byte[] getEncodedRegionName() {
+ return encodedRegionName;
+ }
+
+ public TableName getTableName() {
+ return tableName;
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
deleted file mode 100644
index 9fc43b1..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
-import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
-
-/**
- * Class that manages the output streams from the log splitting process.
- */
-@InterfaceAudience.Private
-public class LogRecoveredEditsOutputSink extends OutputSink {
- private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
- private WALSplitter walSplitter;
- private FileSystem walFS;
- private Configuration conf;
-
- public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
- WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
- // More threads could potentially write faster at the expense
- // of causing more disk seeks as the logs are split.
- // 3. After a certain setting (probably around 3) the
- // process will be bound on the reader in the current
- // implementation anyway.
- super(controller, entryBuffers, numWriters);
- this.walSplitter = walSplitter;
- this.walFS = walSplitter.walFS;
- this.conf = walSplitter.conf;
- }
-
- /**
- * @return null if failed to report progress
- */
- @Override
- public List<Path> finishWritingAndClose() throws IOException {
- boolean isSuccessful = false;
- List<Path> result = null;
- try {
- isSuccessful = finishWriting(false);
- } finally {
- result = close();
- List<IOException> thrown = closeLogWriters(null);
- if (CollectionUtils.isNotEmpty(thrown)) {
- throw MultipleIOException.createIOException(thrown);
- }
- }
- if (isSuccessful) {
- splits = result;
- }
- return splits;
- }
-
- // delete the one with fewer wal entries
- private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
- throws IOException {
- long dstMinLogSeqNum = -1L;
- try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
- WAL.Entry entry = reader.next();
- if (entry != null) {
- dstMinLogSeqNum = entry.getKey().getSequenceId();
- }
- } catch (EOFException e) {
- LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
- e);
- }
- if (wap.minLogSeqNum < dstMinLogSeqNum) {
- LOG.warn("Found existing old edits file. It could be the result of a previous failed"
- + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
- + walFS.getFileStatus(dst).getLen());
- if (!walFS.delete(dst, false)) {
- LOG.warn("Failed deleting of old {}", dst);
- throw new IOException("Failed deleting of old " + dst);
- }
- } else {
- LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
- + ", length=" + walFS.getFileStatus(wap.path).getLen());
- if (!walFS.delete(wap.path, false)) {
- LOG.warn("Failed deleting of {}", wap.path);
- throw new IOException("Failed deleting of " + wap.path);
- }
- }
- }
-
- /**
- * Close all of the output streams.
- * @return the list of paths written.
- */
- List<Path> close() throws IOException {
- Preconditions.checkState(!closeAndCleanCompleted);
-
- final List<Path> paths = new ArrayList<>();
- final List<IOException> thrown = Lists.newArrayList();
- ThreadPoolExecutor closeThreadPool =
- Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
- private int count = 1;
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "split-log-closeStream-" + count++);
- return t;
- }
- });
- CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
- boolean progress_failed;
- try {
- progress_failed = executeCloseTask(completionService, thrown, paths);
- } catch (InterruptedException e) {
- IOException iie = new InterruptedIOException();
- iie.initCause(e);
- throw iie;
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- } finally {
- closeThreadPool.shutdownNow();
- }
- if (!thrown.isEmpty()) {
- throw MultipleIOException.createIOException(thrown);
- }
- writersClosed = true;
- closeAndCleanCompleted = true;
- if (progress_failed) {
- return null;
- }
- return paths;
- }
-
- /**
- * @param completionService threadPool to execute the closing tasks
- * @param thrown store the exceptions
- * @param paths arrayList to store the paths written
- * @return if close tasks executed successful
- */
- boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
- List<Path> paths) throws InterruptedException, ExecutionException {
- for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
- }
- completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
- Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
- paths.add(dst);
- return null;
- }
- });
- }
- boolean progress_failed = false;
- for (int i = 0, n = this.writers.size(); i < n; i++) {
- Future<Void> future = completionService.take();
- future.get();
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- }
- return progress_failed;
- }
-
- Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
- List<IOException> thrown) throws IOException {
- LOG.trace("Closing {}", wap.path);
- try {
- wap.writer.close();
- } catch (IOException ioe) {
- LOG.error("Could not close log at {}", wap.path, ioe);
- thrown.add(ioe);
- return null;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
- + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
- }
- if (wap.editsWritten == 0) {
- // just remove the empty recovered.edits file
- if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
- LOG.warn("Failed deleting empty {}", wap.path);
- throw new IOException("Failed deleting empty " + wap.path);
- }
- return null;
- }
-
- Path dst = getCompletedRecoveredEditsFilePath(wap.path,
- regionMaximumEditLogSeqNum.get(encodedRegionName));
- try {
- if (!dst.equals(wap.path) && walFS.exists(dst)) {
- deleteOneWithFewerEntries(wap, dst);
- }
- // Skip the unit tests which create a splitter that reads and
- // writes the data without touching disk.
- // TestHLogSplit#testThreading is an example.
- if (walFS.exists(wap.path)) {
- if (!walFS.rename(wap.path, dst)) {
- throw new IOException("Failed renaming " + wap.path + " to " + dst);
- }
- LOG.info("Rename {} to {}", wap.path, dst);
- }
- } catch (IOException ioe) {
- LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
- thrown.add(ioe);
- return null;
- }
- return dst;
- }
-
- private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
- if (writersClosed) {
- return thrown;
- }
- if (thrown == null) {
- thrown = Lists.newArrayList();
- }
- try {
- for (WriterThread writerThread : writerThreads) {
- while (writerThread.isAlive()) {
- writerThread.setShouldStop(true);
- writerThread.interrupt();
- try {
- writerThread.join(10);
- } catch (InterruptedException e) {
- IOException iie = new InterruptedIOException();
- iie.initCause(e);
- throw iie;
- }
- }
- }
- } finally {
- WALSplitter.WriterAndPath wap = null;
- for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
- try {
- wap = (WALSplitter.WriterAndPath) tmpWAP;
- wap.writer.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close log at {}", wap.path, ioe);
- thrown.add(ioe);
- continue;
- }
- LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
- + (wap.nanosSpent / 1000 / 1000) + "ms)");
- }
- writersClosed = true;
- }
-
- return thrown;
- }
-
- /**
- * Get a writer and path for a log starting at the given entry. This function is threadsafe so
- * long as multiple threads are always acting on different regions.
- * @return null if this region shouldn't output any logs
- */
- WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
- byte[] region = entry.getKey().getEncodedRegionName();
- String regionName = Bytes.toString(region);
- WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
- if (ret != null) {
- return ret;
- }
- // If we already decided that this region doesn't get any output
- // we don't need to check again.
- if (blacklistedRegions.contains(region)) {
- return null;
- }
- ret = createWAP(region, entry);
- if (ret == null) {
- blacklistedRegions.add(region);
- return null;
- }
- if (reusable) {
- writers.put(regionName, ret);
- }
- return ret;
- }
-
- /**
- * @return a path with a write for that path. caller should close.
- */
- WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
- String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- Path regionedits = getRegionSplitEditsPath(entry,
- walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
- if (regionedits == null) {
- return null;
- }
- FileSystem walFs = FSUtils.getWALFileSystem(conf);
- if (walFs.exists(regionedits)) {
- LOG.warn("Found old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
- + walFs.getFileStatus(regionedits).getLen());
- if (!walFs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old {}", regionedits);
- }
- }
- WALProvider.Writer w = walSplitter.createWriter(regionedits);
- LOG.debug("Creating writer path={}", regionedits);
- return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
- }
-
-
-
- void filterCellByStore(WAL.Entry logEntry) {
- Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
- .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
- if (MapUtils.isEmpty(maxSeqIdInStores)) {
- return;
- }
- // Create the array list for the cells that aren't filtered.
- // We make the assumption that most cells will be kept.
- ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
- for (Cell cell : logEntry.getEdit().getCells()) {
- if (WALEdit.isMetaEditFamily(cell)) {
- keptCells.add(cell);
- } else {
- byte[] family = CellUtil.cloneFamily(cell);
- Long maxSeqId = maxSeqIdInStores.get(family);
- // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
- // or the master was crashed before and we can not get the information.
- if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
- keptCells.add(cell);
- }
- }
- }
-
- // Anything in the keptCells array list is still live.
- // So rather than removing the cells from the array list
- // which would be an O(n^2) operation, we just replace the list
- logEntry.getEdit().setCells(keptCells);
- }
-
- @Override
- public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
- appendBuffer(buffer, true);
- }
-
- WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
- throws IOException {
- List<WAL.Entry> entries = buffer.entryBuffer;
- if (entries.isEmpty()) {
- LOG.warn("got an empty buffer, skipping");
- return null;
- }
-
- WALSplitter.WriterAndPath wap = null;
-
- long startTime = System.nanoTime();
- try {
- int editsCount = 0;
-
- for (WAL.Entry logEntry : entries) {
- if (wap == null) {
- wap = getWriterAndPath(logEntry, reusable);
- if (wap == null) {
- // This log spews the full edit. Can be massive in the log. Enable only debugging
- // WAL lost edit issues.
- LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
- return null;
- }
- }
- filterCellByStore(logEntry);
- if (!logEntry.getEdit().isEmpty()) {
- wap.writer.append(logEntry);
- this.updateRegionMaximumEditLogSeqNum(logEntry);
- editsCount++;
- } else {
- wap.incrementSkippedEdits(1);
- }
- }
- // Pass along summary statistics
- wap.incrementEdits(editsCount);
- wap.incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
- LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
- throw e;
- }
- return wap;
- }
-
- @Override
- public boolean keepRegionEvent(WAL.Entry entry) {
- ArrayList<Cell> cells = entry.getEdit().getCells();
- for (Cell cell : cells) {
- if (WALEdit.isCompactionMarker(cell)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * @return a map from encoded region ID to the number of edits written out for that region.
- */
- @Override
- public Map<byte[], Long> getOutputCounts() {
- TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
- ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
- }
- return ret;
- }
-
- @Override
- public int getNumberOfRecoveredRegions() {
- return writers.size();
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index 729ea8b..4472f62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -19,17 +19,18 @@
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,39 +42,36 @@
* ways of consuming recovered edits.
*/
@InterfaceAudience.Private
-public abstract class OutputSink {
+abstract class OutputSink {
private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
- protected WALSplitter.PipelineController controller;
- protected EntryBuffers entryBuffers;
+ private final WALSplitter.PipelineController controller;
+ protected final EntryBuffers entryBuffers;
- protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
- protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
- new ConcurrentHashMap<>();
-
- protected final List<WriterThread> writerThreads = Lists.newArrayList();
-
- /* Set of regions which we've decided should not output edits */
- protected final Set<byte[]> blacklistedRegions =
- Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
-
- protected boolean closeAndCleanCompleted = false;
-
- protected boolean writersClosed = false;
+ private final List<WriterThread> writerThreads = Lists.newArrayList();
protected final int numThreads;
protected CancelableProgressable reporter = null;
- protected AtomicLong skippedEdits = new AtomicLong();
+ protected final AtomicLong totalSkippedEdits = new AtomicLong();
- protected List<Path> splits = null;
+ protected final List<Path> splits = new ArrayList<>();
+
+ /**
+ * Used when close this output sink.
+ */
+ protected final ThreadPoolExecutor closeThreadPool;
+ protected final CompletionService<Void> closeCompletionService;
public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
int numWriters) {
- numThreads = numWriters;
+ this.numThreads = numWriters;
this.controller = controller;
this.entryBuffers = entryBuffers;
+ this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
+ Threads.newDaemonThreadFactory("split-log-closeStream-"));
+ this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
}
void setReporter(CancelableProgressable reporter) {
@@ -83,7 +81,7 @@
/**
* Start the threads that will pump data from the entryBuffers to the output files.
*/
- public synchronized void startWriterThreads() {
+ synchronized void startWriterThreads() {
for (int i = 0; i < numThreads; i++) {
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
t.start();
@@ -92,48 +90,20 @@
}
/**
- * Update region's maximum edit log SeqNum.
- */
- void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
- synchronized (regionMaximumEditLogSeqNum) {
- String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
- Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
- if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
- regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
- }
- }
- }
-
- /**
- * @return the number of currently opened writers
- */
- int getNumOpenWriters() {
- return this.writers.size();
- }
-
- long getSkippedEdits() {
- return this.skippedEdits.get();
- }
-
- /**
* Wait for writer threads to dump all info to the sink
+ *
* @return true when there is no error
*/
- protected boolean finishWriting(boolean interrupt) throws IOException {
+ boolean finishWriterThreads() throws IOException {
LOG.debug("Waiting for split writer threads to finish");
- boolean progress_failed = false;
+ boolean progressFailed = false;
for (WriterThread t : writerThreads) {
t.finish();
}
- if (interrupt) {
- for (WriterThread t : writerThreads) {
- t.interrupt(); // interrupt the writer threads. We are stopping now.
- }
- }
for (WriterThread t : writerThreads) {
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
+ if (!progressFailed && reporter != null && !reporter.progress()) {
+ progressFailed = true;
}
try {
t.join();
@@ -144,41 +114,42 @@
}
}
controller.checkForErrors();
- LOG.info("{} split writers finished; closing.", this.writerThreads.size());
- return (!progress_failed);
+ LOG.info("{} split writer threads finished", this.writerThreads.size());
+ return (!progressFailed);
}
- public abstract List<Path> finishWritingAndClose() throws IOException;
+ long getTotalSkippedEdits() {
+ return this.totalSkippedEdits.get();
+ }
+
+ /**
+ * @return the number of currently opened writers
+ */
+ abstract int getNumOpenWriters();
+
+ /**
+ * @param buffer A buffer of some number of edits for a given region.
+ */
+ abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
+
+ abstract List<Path> close() throws IOException;
/**
* @return a map from encoded region ID to the number of edits written out for that region.
*/
- public abstract Map<byte[], Long> getOutputCounts();
+ abstract Map<byte[], Long> getOutputCounts();
/**
* @return number of regions we've recovered
*/
- public abstract int getNumberOfRecoveredRegions();
-
- /**
- * @param buffer A WAL Edit Entry
- */
- public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
-
- /**
- * WriterThread call this function to help flush internal remaining edits in buffer before close
- * @return true when underlying sink has something to flush
- */
- public boolean flush() throws IOException {
- return false;
- }
+ abstract int getNumberOfRecoveredRegions();
/**
* Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
* want to get all of those edits.
* @return Return true if this sink wants to accept this region-level WALEdit.
*/
- public abstract boolean keepRegionEvent(WAL.Entry entry);
+ abstract boolean keepRegionEvent(WAL.Entry entry);
public static class WriterThread extends Thread {
private volatile boolean shouldStop = false;
@@ -207,11 +178,11 @@
private void doRun() throws IOException {
LOG.trace("Writer thread starting");
while (true) {
- WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+ EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) {
// No data currently available, wait on some more to show up
synchronized (controller.dataAvailable) {
- if (shouldStop && !this.outputSink.flush()) {
+ if (shouldStop) {
return;
}
try {
@@ -234,15 +205,11 @@
}
}
- private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+ private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
outputSink.append(buffer);
}
- void setShouldStop(boolean shouldStop) {
- this.shouldStop = shouldStop;
- }
-
- void finish() {
+ private void finish() {
synchronized (controller.dataAvailable) {
shouldStop = true;
controller.dataAvailable.notifyAll();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
new file mode 100644
index 0000000..ffe805f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ * Every region only has one recovered edits.
+ */
+@InterfaceAudience.Private
+class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
+ private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
+ private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>();
+
+ public RecoveredEditsOutputSink(WALSplitter walSplitter,
+ WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+ super(walSplitter, controller, entryBuffers, numWriters);
+ }
+
+ @Override
+ public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
+ List<WAL.Entry> entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return;
+ }
+ RecoveredEditsWriter writer =
+ getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
+ entries.get(0).getKey().getSequenceId());
+ if (writer != null) {
+ writer.writeRegionEntries(entries);
+ }
+ }
+
+ /**
+ * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+ * long as multiple threads are always acting on different regions.
+ * @return null if this region shouldn't output any logs
+ */
+ private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
+ long seqId) throws IOException {
+ RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
+ if (ret != null) {
+ return ret;
+ }
+ ret = createRecoveredEditsWriter(tableName, region, seqId);
+ if (ret == null) {
+ return null;
+ }
+ writers.put(Bytes.toString(region), ret);
+ return ret;
+ }
+
+ @Override
+ public List<Path> close() throws IOException {
+ boolean isSuccessful = true;
+ try {
+ isSuccessful &= finishWriterThreads();
+ } finally {
+ isSuccessful &= closeWriters();
+ }
+ return isSuccessful ? splits : null;
+ }
+
+ /**
+ * Close all of the output streams.
+ *
+ * @return true when there is no error.
+ */
+ private boolean closeWriters() throws IOException {
+ List<IOException> thrown = Lists.newArrayList();
+ for (RecoveredEditsWriter writer : writers.values()) {
+ closeCompletionService.submit(() -> {
+ Path dst = closeRecoveredEditsWriter(writer, thrown);
+ splits.add(dst);
+ return null;
+ });
+ }
+ boolean progressFailed = false;
+ try {
+ for (int i = 0, n = this.writers.size(); i < n; i++) {
+ Future<Void> future = closeCompletionService.take();
+ future.get();
+ if (!progressFailed && reporter != null && !reporter.progress()) {
+ progressFailed = true;
+ }
+ }
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ closeThreadPool.shutdownNow();
+ }
+ if (!thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ return !progressFailed;
+ }
+
+ @Override
+ public Map<byte[], Long> getOutputCounts() {
+ TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
+ ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+ }
+ return ret;
+ }
+
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return writers.size();
+ }
+
+ @Override
+ int getNumOpenWriters() {
+ return writers.size();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 3aba309..ca0d8ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -163,7 +163,9 @@
* named for the sequenceid in the passed <code>logEntry</code>: e.g.
* /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
* RECOVERED_EDITS_DIR under the region creating it if necessary.
- * @param walEntry walEntry to recover
+ * @param tableName the table name
+ * @param encodedRegionName the encoded region name
+ * @param sedId the sequence id which used to generate file name
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
* @param tmpDirName of the directory used to sideline old recovered edits file
* @param conf configuration
@@ -172,12 +174,12 @@
*/
@SuppressWarnings("deprecation")
@VisibleForTesting
- static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
- String tmpDirName, Configuration conf) throws IOException {
+ static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId,
+ String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
FileSystem walFS = FSUtils.getWALFileSystem(conf);
- Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
- String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
- Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
+ Path tableDir = FSUtils.getWALTableDir(conf, tableName);
+ String encodedRegionNameStr = Bytes.toString(encodedRegionName);
+ Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr);
Path dir = getRegionDirRecoveredEditsDir(regionDir);
if (walFS.exists(dir) && walFS.isFile(dir)) {
@@ -185,7 +187,7 @@
if (!walFS.exists(tmp)) {
walFS.mkdirs(tmp);
}
- tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
+ tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr);
LOG.warn("Found existing old file: {}. It could be some "
+ "leftover of an old installation. It should be a folder instead. "
+ "So moving it to {}",
@@ -201,7 +203,7 @@
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
- String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
+ String fileName = formatRecoveredEditsFileName(sedId);
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
return new Path(dir, fileName);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 5d07061..a435c78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -38,9 +38,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -49,12 +47,10 @@
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -107,11 +103,12 @@
// the file being split currently
private FileStatus fileBeingSplit;
- // if we limit the number of writers opened for sinking recovered edits
- private final boolean splitWriterCreationBounded;
+ private final String tmpDirName;
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
-
+ public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
+ public final static String SPLIT_WAL_WRITER_THREADS =
+ "hbase.regionserver.hlog.splitlog.writer.threads";
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
@@ -127,20 +124,21 @@
this.walFactory = factory;
PipelineController controller = new PipelineController();
+ this.tmpDirName =
+ conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
-
- entryBuffers = new EntryBuffers(controller,
- this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
- splitWriterCreationBounded);
-
- int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ // if we limit the number of writers opened for sinking recovered edits
+ boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+ long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
+ int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
if (splitWriterCreationBounded) {
+ entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
- new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads);
+ new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
} else {
+ entryBuffers = new EntryBuffers(controller, bufferSize);
outputSink =
- new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
+ new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
}
}
@@ -152,6 +150,10 @@
return fileBeingSplit;
}
+ String getTmpDirName() {
+ return this.tmpDirName;
+ }
+
Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
return regionMaxSeqIdInStores;
}
@@ -215,7 +217,7 @@
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
Path logPath = logfile.getPath();
boolean outputSinkStarted = false;
- boolean progress_failed = false;
+ boolean progressFailed = false;
int editsCount = 0;
int editsSkipped = 0;
@@ -230,7 +232,7 @@
logLength);
status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) {
- progress_failed = true;
+ progressFailed = true;
return false;
}
logFileReader = getReader(logfile, skipErrors, reporter);
@@ -288,11 +290,11 @@
if (editsCount % interval == 0
|| moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
numOpenedFilesLastCheck = this.getNumOpenWriters();
- String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
+ String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
+ " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
if (reporter != null && !reporter.progress()) {
- progress_failed = true;
+ progressFailed = true;
return false;
}
}
@@ -326,9 +328,9 @@
try {
if (outputSinkStarted) {
// Set progress_failed to true as the immediate following statement will reset its value
- // when finishWritingAndClose() throws exception, progress_failed has the right value
- progress_failed = true;
- progress_failed = outputSink.finishWritingAndClose() == null;
+ // when close() throws exception, progress_failed has the right value
+ progressFailed = true;
+ progressFailed = outputSink.close() == null;
}
} finally {
long processCost = EnvironmentEdgeManager.currentTime() - startTS;
@@ -337,18 +339,18 @@
outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
" ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
- ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
+ ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
LOG.info(msg);
status.markComplete(msg);
}
}
- return !progress_failed;
+ return !progressFailed;
}
/**
* Create a new {@link Reader} for reading logs to split.
*/
- protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
+ private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
throws IOException, CorruptedLogFileException {
Path path = file.getPath();
long length = file.getLen();
@@ -392,7 +394,7 @@
return in;
}
- static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+ private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
throws CorruptedLogFileException, IOException {
try {
return in.next();
@@ -475,98 +477,6 @@
}
}
- /**
- * A buffer of some number of edits for a given region.
- * This accumulates edits and also provides a memory optimization in order to
- * share a single byte array instance for the table and region name.
- * Also tracks memory usage of the accumulated edits.
- */
- public static class RegionEntryBuffer implements HeapSize {
- long heapInBuffer = 0;
- List<Entry> entryBuffer;
- TableName tableName;
- byte[] encodedRegionName;
-
- RegionEntryBuffer(TableName tableName, byte[] region) {
- this.tableName = tableName;
- this.encodedRegionName = region;
- this.entryBuffer = new ArrayList<>();
- }
-
- long appendEntry(Entry entry) {
- internify(entry);
- entryBuffer.add(entry);
- long incrHeap = entry.getEdit().heapSize() +
- ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
- 0; // TODO linkedlist entry
- heapInBuffer += incrHeap;
- return incrHeap;
- }
-
- private void internify(Entry entry) {
- WALKeyImpl k = entry.getKey();
- k.internTableName(this.tableName);
- k.internEncodedRegionName(this.encodedRegionName);
- }
-
- @Override
- public long heapSize() {
- return heapInBuffer;
- }
-
- public byte[] getEncodedRegionName() {
- return encodedRegionName;
- }
-
- public List<Entry> getEntryBuffer() {
- return entryBuffer;
- }
-
- public TableName getTableName() {
- return tableName;
- }
- }
-
- /**
- * Class wraps the actual writer which writes data out and related statistics
- */
- public abstract static class SinkWriter {
- /* Count of edits written to this path */
- long editsWritten = 0;
- /* Count of edits skipped to this path */
- long editsSkipped = 0;
- /* Number of nanos spent writing to this log */
- long nanosSpent = 0;
-
- void incrementEdits(int edits) {
- editsWritten += edits;
- }
-
- void incrementSkippedEdits(int skipped) {
- editsSkipped += skipped;
- }
-
- void incrementNanoTime(long nanos) {
- nanosSpent += nanos;
- }
- }
-
- /**
- * Private data structure that wraps a Writer and its Path, also collecting statistics about the
- * data written to this output.
- */
- final static class WriterAndPath extends SinkWriter {
- final Path path;
- final Writer writer;
- final long minLogSeqNum;
-
- WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) {
- this.path = path;
- this.writer = writer;
- this.minLogSeqNum = minLogSeqNum;
- }
- }
-
static class CorruptedLogFileException extends Exception {
private static final long serialVersionUID = 1L;
@@ -583,6 +493,5 @@
CorruptedLogFileException(String message, Throwable cause) {
super(message, cause);
}
-
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index 741d449..4d6600d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -134,7 +133,7 @@
@Test
public void testRegionEntryBuffer() throws Exception {
- WALSplitter.RegionEntryBuffer reb = new WALSplitter.RegionEntryBuffer(
+ EntryBuffers.RegionEntryBuffer reb = new EntryBuffers.RegionEntryBuffer(
TEST_TABLE, TEST_REGION);
assertEquals(0, reb.heapSize());
@@ -153,7 +152,7 @@
assertTrue(sink.totalBuffered > 0);
long amountInChunk = sink.totalBuffered;
// Get a chunk
- RegionEntryBuffer chunk = sink.getChunkToWrite();
+ EntryBuffers.RegionEntryBuffer chunk = sink.getChunkToWrite();
assertEquals(chunk.heapSize(), amountInChunk);
// Make sure it got marked that a thread is "working on this"
@@ -172,7 +171,7 @@
// to get the second
sink.doneWriting(chunk);
- RegionEntryBuffer chunk2 = sink.getChunkToWrite();
+ EntryBuffers.RegionEntryBuffer chunk2 = sink.getChunkToWrite();
assertNotNull(chunk2);
assertNotSame(chunk, chunk2);
long amountInChunk2 = sink.totalBuffered;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index cfeaac8..8ddd0ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -407,15 +407,15 @@
WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
}
- private Path createRecoveredEditsPathForRegion() throws IOException{
+ private Path createRecoveredEditsPathForRegion() throws IOException {
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
long now = System.currentTimeMillis();
- Entry entry =
- new Entry(new WALKeyImpl(encoded,
- TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
+ Entry entry = new Entry(
+ new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
- Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
- FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
+ Path p = WALSplitUtil
+ .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
+ TMPDIRNAME, conf);
return p;
}