blob: 7bf33a93f40cb4e9d427b16782bff937ed286341 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.utils.MmapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode.DEFAULT_SEARCH_INDEX;
/**
* This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
* writes and avoid waiting for buffer syncing to disk.
*/
public class WALBuffer extends AbstractWALBuffer {
private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
private static final double FSYNC_BUFFER_RATIO = 0.95;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
// whether close method is called
private volatile boolean isClosed = false;
// manage checkpoints
private final CheckpointManager checkpointManager;
// WALEntries
private final BlockingQueue<WALEntry> walEntries = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
// lock to provide synchronization for double buffers mechanism, protecting buffers status
private final Lock buffersLock = new ReentrantLock();
// condition to guarantee correctness of switching buffers
private final Condition idleBufferReadyCondition = buffersLock.newCondition();
// last writer position when fsync is called, help record each entry's position
private long lastFsyncPosition;
// region these variables should be protected by buffersLock
/** two buffers switch between three statuses (there is always 1 buffer working). */
// buffer in working status, only updated by serializeThread
// it's safe to use volatile here to make this reference thread-safe.
@SuppressWarnings("squid:S3077")
private volatile ByteBuffer workingBuffer;
// buffer in idle status
// it's safe to use volatile here to make this reference thread-safe.
@SuppressWarnings("squid:S3077")
private volatile ByteBuffer idleBuffer;
// buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
// it's safe to use volatile here to make this reference thread-safe.
@SuppressWarnings("squid:S3077")
private volatile ByteBuffer syncingBuffer;
// endregion
// file status of working buffer, updating file writer's status when syncing
protected volatile WALFileStatus currentFileStatus;
// single thread to serialize WALEntry to workingBuffer
private final ExecutorService serializeThread;
// single thread to sync syncingBuffer to disk
private final ExecutorService syncBufferThread;
// manage wal files which have MemTableIds
private final Map<Long, Set<Long>> memTableIdsOfWal = new ConcurrentHashMap<>();
public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
this(identifier, logDirectory, new CheckpointManager(identifier, logDirectory), 0, 0L);
}
public WALBuffer(
String identifier,
String logDirectory,
CheckpointManager checkpointManager,
long startFileVersion,
long startSearchIndex)
throws FileNotFoundException {
super(identifier, logDirectory, startFileVersion, startSearchIndex);
this.checkpointManager = checkpointManager;
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
allocateBuffers();
serializeThread =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
syncBufferThread =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
// start receiving serialize tasks
serializeThread.submit(new SerializeTask());
}
private void allocateBuffers() {
try {
workingBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
idleBuffer = ByteBuffer.allocateDirect(HALF_WAL_BUFFER_SIZE);
} catch (OutOfMemoryError e) {
logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
close();
throw e;
}
}
@TestOnly
public void setBufferSize(int size) {
buffersLock.lock();
try {
if (workingBuffer != null) {
MmapUtil.clean((MappedByteBuffer) workingBuffer);
}
if (idleBuffer != null) {
MmapUtil.clean((MappedByteBuffer) workingBuffer);
}
if (syncingBuffer != null) {
MmapUtil.clean((MappedByteBuffer) syncingBuffer);
}
workingBuffer = ByteBuffer.allocateDirect(size / 2);
idleBuffer = ByteBuffer.allocateDirect(size / 2);
} catch (OutOfMemoryError e) {
logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
close();
throw e;
} finally {
buffersLock.unlock();
}
}
@Override
public void write(WALEntry walEntry) {
if (isClosed) {
logger.error(
"Fail to write WALEntry into wal node-{} because this node is closed.", identifier);
walEntry.getWalFlushListener().fail(new WALNodeClosedException(identifier));
return;
}
// just add this WALEntry to queue
try {
walEntries.put(walEntry);
} catch (InterruptedException e) {
logger.warn("Interrupted when waiting for adding WALEntry to buffer.");
Thread.currentThread().interrupt();
}
}
// region Task of serializeThread
/** This info class traverses some extra info from serializeThread to syncBufferThread. */
private static class SerializeInfo {
final WALMetaData metaData = new WALMetaData();
final Map<Long, Long> memTableId2WalDiskUsage = new HashMap<>();
final List<Checkpoint> checkpoints = new ArrayList<>();
final List<WALFlushListener> fsyncListeners = new ArrayList<>();
WALFlushListener rollWALFileWriterListener = null;
}
/** This task serializes WALEntry to workingBuffer and will call fsync at last. */
private class SerializeTask implements Runnable {
private final ByteBufferView byteBufferView = new ByteBufferView();
private final SerializeInfo info = new SerializeInfo();
private int totalSize = 0;
@Override
public void run() {
try {
serialize();
} finally {
if (!isClosed) {
serializeThread.submit(new SerializeTask());
}
}
}
// In order to control memory usage of blocking queue, get 1 and then serialize 1
private void serialize() {
// try to get first WALEntry with blocking interface
long start = System.nanoTime();
try {
WALEntry firstWALEntry = walEntries.take();
boolean returnFlag = handleWALEntry(firstWALEntry);
if (returnFlag) {
WRITING_METRICS.recordSerializeWALEntryTotalCost(System.nanoTime() - start);
return;
}
} catch (InterruptedException e) {
logger.warn(
"Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
Thread.currentThread().interrupt();
}
// try to get more WALEntries with blocking interface to enlarge write batch
while (totalSize < HALF_WAL_BUFFER_SIZE * FSYNC_BUFFER_RATIO) {
WALEntry walEntry = null;
try {
// for better fsync performance, wait a while to enlarge write batch
if (config.getWalMode().equals(WALMode.ASYNC)) {
walEntry =
walEntries.poll(config.getWalAsyncModeFsyncDelayInMs(), TimeUnit.MILLISECONDS);
} else {
walEntry =
walEntries.poll(config.getWalSyncModeFsyncDelayInMs(), TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
logger.warn(
"Interrupted when waiting for taking WALEntry from blocking queue to serialize.");
Thread.currentThread().interrupt();
}
if (walEntry == null) {
break;
}
boolean returnFlag = handleWALEntry(walEntry);
if (returnFlag) {
WRITING_METRICS.recordSerializeWALEntryTotalCost(System.nanoTime() - start);
return;
}
}
WRITING_METRICS.recordSerializeWALEntryTotalCost(System.nanoTime() - start);
// call fsync at last and set fsyncListeners
if (totalSize > 0 || !info.checkpoints.isEmpty()) {
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
}
/**
* Handle wal info and signal entry.
*
* @return true if fsyncWorkingBuffer has been called, which means this serialization task
* should be ended.
*/
private boolean handleWALEntry(WALEntry walEntry) {
if (walEntry.isSignal()) {
return handleSignalEntry((WALSignalEntry) walEntry);
}
handleInfoEntry(walEntry);
return false;
}
/** Handle a normal info WALEntry. */
private void handleInfoEntry(WALEntry walEntry) {
if (walEntry.getType() == WALEntryType.MEMORY_TABLE_CHECKPOINT) {
info.checkpoints.add((Checkpoint) walEntry.getValue());
return;
}
int startPosition = byteBufferView.position();
int size;
try {
walEntry.serialize(byteBufferView);
size = byteBufferView.position() - startPosition;
} catch (Exception e) {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
walEntry.getWalFlushListener().fail(e);
return;
}
// parse search index
long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType().needSearch()) {
if (walEntry.getType() == WALEntryType.DELETE_DATA_NODE) {
searchIndex = ((DeleteDataNode) walEntry.getValue()).getSearchIndex();
} else {
searchIndex = ((InsertNode) walEntry.getValue()).getSearchIndex();
}
if (searchIndex != DEFAULT_SEARCH_INDEX) {
currentSearchIndex = searchIndex;
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
// update related info
totalSize += size;
info.metaData.add(size, searchIndex, walEntry.getMemTableId());
info.memTableId2WalDiskUsage.compute(
walEntry.getMemTableId(), (k, v) -> v == null ? size : v + size);
walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
/**
* Handle a signal entry.
*
* @return true if fsyncWorkingBuffer has been called, which means this serialization task
* should be ended.
*/
private boolean handleSignalEntry(WALSignalEntry walSignalEntry) {
switch (walSignalEntry.getType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
if (logger.isDebugEnabled()) {
logger.debug("Handle roll log writer signal for wal node-{}.", identifier);
}
info.rollWALFileWriterListener = walSignalEntry.getWalFlushListener();
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
return true;
case CLOSE_SIGNAL:
if (logger.isDebugEnabled()) {
logger.debug(
"Handle close signal for wal node-{}, there are {} entries left.",
identifier,
walEntries.size());
}
boolean dataExists = totalSize > 0;
if (dataExists) {
fsyncWorkingBuffer(currentSearchIndex, currentFileStatus, info);
}
isClosed = true;
return dataExists;
default:
return false;
}
}
}
/**
* This view uses workingBuffer lock-freely because workingBuffer is only updated by
* serializeThread and this class is only used by serializeThread.
*/
private class ByteBufferView implements IWALByteBufferView {
private int flushedBytesNum = 0;
private void ensureEnoughSpace(int bytesNum) {
if (workingBuffer.remaining() < bytesNum) {
rollBuffer();
}
}
private void rollBuffer() {
flushedBytesNum += workingBuffer.position();
syncWorkingBuffer(currentSearchIndex, currentFileStatus);
}
@Override
public void put(byte b) {
ensureEnoughSpace(Byte.BYTES);
workingBuffer.put(b);
}
@Override
public void put(byte[] src) {
int offset = 0;
while (true) {
int leftCapacity = workingBuffer.remaining();
int needCapacity = src.length - offset;
if (leftCapacity >= needCapacity) {
workingBuffer.put(src, offset, needCapacity);
break;
} else {
workingBuffer.put(src, offset, leftCapacity);
offset += leftCapacity;
rollBuffer();
}
}
}
@Override
public void putChar(char value) {
ensureEnoughSpace(Character.BYTES);
workingBuffer.putChar(value);
}
@Override
public void putShort(short value) {
ensureEnoughSpace(Short.BYTES);
workingBuffer.putShort(value);
}
@Override
public void putInt(int value) {
ensureEnoughSpace(Integer.BYTES);
workingBuffer.putInt(value);
}
@Override
public void putLong(long value) {
ensureEnoughSpace(Long.BYTES);
workingBuffer.putLong(value);
}
@Override
public void putFloat(float value) {
ensureEnoughSpace(Float.BYTES);
workingBuffer.putFloat(value);
}
@Override
public void putDouble(double value) {
ensureEnoughSpace(Double.BYTES);
workingBuffer.putDouble(value);
}
@Override
public int position() {
return flushedBytesNum + workingBuffer.position();
}
}
/** Notice: this method only called when buffer is exhausted by SerializeTask. */
private void syncWorkingBuffer(long searchIndex, WALFileStatus fileStatus) {
switchWorkingBufferToFlushing();
syncBufferThread.submit(new SyncBufferTask(searchIndex, fileStatus, false));
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
}
/** Notice: this method only called at the last of SerializeTask. */
private void fsyncWorkingBuffer(long searchIndex, WALFileStatus fileStatus, SerializeInfo info) {
switchWorkingBufferToFlushing();
syncBufferThread.submit(new SyncBufferTask(searchIndex, fileStatus, true, info));
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
}
// only called by serializeThread
private void switchWorkingBufferToFlushing() {
buffersLock.lock();
try {
while (idleBuffer == null) {
idleBufferReadyCondition.await();
}
syncingBuffer = workingBuffer;
workingBuffer = idleBuffer;
workingBuffer.clear();
idleBuffer = null;
} catch (InterruptedException e) {
logger.warn("Interrupted When waiting for available working buffer.");
Thread.currentThread().interrupt();
} finally {
buffersLock.unlock();
}
}
// endregion
// region Task of syncBufferThread
/**
* This task syncs syncingBuffer to disk. The precondition is that syncingBuffer cannot be null.
*/
private class SyncBufferTask implements Runnable {
private final long searchIndex;
private final WALFileStatus fileStatus;
private final boolean forceFlag;
private final SerializeInfo info;
public SyncBufferTask(long searchIndex, WALFileStatus fileStatus, boolean forceFlag) {
this(searchIndex, fileStatus, forceFlag, null);
}
public SyncBufferTask(
long searchIndex, WALFileStatus fileStatus, boolean forceFlag, SerializeInfo info) {
this.searchIndex = searchIndex;
this.fileStatus = fileStatus;
this.forceFlag = forceFlag;
this.info = info == null ? new SerializeInfo() : info;
}
@Override
public void run() {
final long startTime = System.nanoTime();
makeMemTableCheckpoints();
long walFileVersionId = currentWALFileVersion;
currentWALFileWriter.updateFileStatus(fileStatus);
// calculate buffer used ratio
double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
WRITING_METRICS.recordWALBufferUsedRatio(usedRatio);
logger.debug(
"Sync wal buffer, forceFlag: {}, buffer used: {} / {} = {}%",
forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);
// flush buffer to os
try {
currentWALFileWriter.write(syncingBuffer, info.metaData);
} catch (Throwable e) {
logger.error(
"Fail to sync wal node-{}'s buffer, change system mode to error.", identifier, e);
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
} finally {
switchSyncingBufferToIdle();
}
// update info
memTableIdsOfWal
.computeIfAbsent(currentWALFileVersion, memTableIds -> new HashSet<>())
.addAll(info.metaData.getMemTablesId());
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
|| (forceFlag && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
try {
rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
forceSuccess = true;
if (info.rollWALFileWriterListener != null) {
info.rollWALFileWriterListener.succeed();
}
} catch (IOException e) {
logger.error(
"Fail to roll wal node-{}'s log writer, change system mode to error.", identifier, e);
if (info.rollWALFileWriterListener != null) {
info.rollWALFileWriterListener.fail(e);
}
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
}
} else if (forceFlag) { // force os cache to the storage device, avoid force twice by judging
// after rolling file
try {
currentWALFileWriter.force();
forceSuccess = true;
} catch (IOException e) {
logger.error(
"Fail to fsync wal node-{}'s log writer, change system mode to error.",
identifier,
e);
for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.fail(e);
}
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
}
}
// notify all waiting listeners
if (forceSuccess) {
long position = lastFsyncPosition;
for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.succeed();
if (fsyncListener.getWalEntryHandler() != null) {
fsyncListener.getWalEntryHandler().setEntryPosition(walFileVersionId, position);
position += fsyncListener.getWalEntryHandler().getSize();
}
}
lastFsyncPosition = currentWALFileWriter.size();
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime, forceFlag);
}
private void makeMemTableCheckpoints() {
if (info.checkpoints.isEmpty()) {
return;
}
for (Checkpoint checkpoint : info.checkpoints) {
switch (checkpoint.getType()) {
case CREATE_MEMORY_TABLE:
checkpointManager.makeCreateMemTableCPOnDisk(
checkpoint.getMemTableInfos().get(0).getMemTableId());
break;
case FLUSH_MEMORY_TABLE:
checkpointManager.makeFlushMemTableCP(
checkpoint.getMemTableInfos().get(0).getMemTableId());
break;
default:
throw new RuntimeException(
"Cannot make other checkpoint types in the wal buffer, type is "
+ checkpoint.getType());
}
}
checkpointManager.fsyncCheckpointFile();
}
}
// only called by syncBufferThread
private void switchSyncingBufferToIdle() {
buffersLock.lock();
try {
// No need to judge whether idleBuffer is null because syncingBuffer is not null
// and there is only one buffer can be null between syncingBuffer and idleBuffer
idleBuffer = syncingBuffer;
syncingBuffer = null;
idleBufferReadyCondition.signalAll();
} finally {
buffersLock.unlock();
}
}
@Override
public void waitForFlush() throws InterruptedException {
buffersLock.lock();
try {
idleBufferReadyCondition.await();
} finally {
buffersLock.unlock();
}
}
@Override
public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
buffersLock.lock();
try {
return idleBufferReadyCondition.await(time, unit);
} finally {
buffersLock.unlock();
}
}
// endregion
@Override
public void close() {
// first waiting serialize and sync tasks finished, then release all resources
if (serializeThread != null) {
// add close signal WALEntry to notify serializeThread
try {
walEntries.put(new WALSignalEntry(WALEntryType.CLOSE_SIGNAL));
} catch (InterruptedException e) {
logger.error("Fail to put CLOSE_SIGNAL to walEntries.", e);
Thread.currentThread().interrupt();
}
isClosed = true;
shutdownThread(serializeThread, ThreadName.WAL_SERIALIZE);
}
if (syncBufferThread != null) {
shutdownThread(syncBufferThread, ThreadName.WAL_SYNC);
}
if (currentWALFileWriter != null) {
try {
currentWALFileWriter.close();
} catch (IOException e) {
logger.error("Fail to close wal node-{}'s log writer.", identifier, e);
}
}
checkpointManager.close();
if (workingBuffer != null) {
MmapUtil.clean((MappedByteBuffer) workingBuffer);
}
if (idleBuffer != null) {
MmapUtil.clean((MappedByteBuffer) workingBuffer);
}
if (syncingBuffer != null) {
MmapUtil.clean((MappedByteBuffer) syncingBuffer);
}
}
private void shutdownThread(ExecutorService thread, ThreadName threadName) {
thread.shutdown();
try {
if (!thread.awaitTermination(30, TimeUnit.SECONDS)) {
logger.warn("Waiting thread {} to be terminated is timeout", threadName.getName());
}
} catch (InterruptedException e) {
logger.warn("Thread {} still doesn't exit after 30s", threadName.getName());
Thread.currentThread().interrupt();
}
}
@Override
public boolean isAllWALEntriesConsumed() {
buffersLock.lock();
try {
return walEntries.isEmpty() && workingBuffer.position() == 0 && syncingBuffer == null;
} finally {
buffersLock.unlock();
}
}
public CheckpointManager getCheckpointManager() {
return checkpointManager;
}
public void removeMemTableIdsOfWal(Long walVersionId) {
this.memTableIdsOfWal.remove(walVersionId);
}
public Set<Long> getMemTableIds(long fileVersionId) {
if (fileVersionId >= currentWALFileVersion) {
return null;
}
return memTableIdsOfWal.computeIfAbsent(
fileVersionId,
id -> {
try {
File file = WALFileUtils.getWALFile(new File(logDirectory), id);
return WALMetaData.readFromWALFile(
file, FileChannel.open(file.toPath(), StandardOpenOption.READ))
.getMemTablesId();
} catch (IOException e) {
logger.warn(
"Fail to read memTable ids from the wal file {} of wal node {}.",
id,
identifier,
e);
return Collections.emptySet();
}
});
}
@TestOnly
public Map<Long, Set<Long>> getMemTableIdsOfWal() {
return memTableIdsOfWal;
}
}