(TEPHRA-243) Improve logging for slow log append and fix concurrency issues in transaction log writer
This closes #53 from Github.
Signed-off-by: anew <anew@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 3a6b70a..bb4b139 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -384,6 +384,15 @@
public static final String NUM_ENTRIES_APPENDED = "count";
public static final String VERSION_KEY = "version";
public static final byte CURRENT_VERSION = 3;
+
+ /**
+ * Time limit, in milliseconds, of an append to the transaction log before we log it as "slow".
+ */
+ public static final String CFG_SLOW_APPEND_THRESHOLD = "data.tx.log.slow.append.threshold";
+ /**
+ * Default value for the threshold in milli seconds for slow log append warnings.
+ */
+ public static final long DEFAULT_SLOW_APPEND_THRESHOLD = 1000;
}
/**
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
index cf97c92..eba5a1f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -19,9 +19,11 @@
package org.apache.tephra.persist;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
+import com.google.common.base.Stopwatch;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.tephra.TxConstants;
import org.apache.tephra.metrics.MetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,17 +31,20 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
/**
* Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending
* this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
+ *
+ * It is important to call close() on this class to ensure that all writes are synced and the log files are closed.
*/
public abstract class AbstractTransactionLog implements TransactionLog {
- /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
- private static final long SLOW_APPEND_THRESHOLD = 1000L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
@@ -47,22 +52,32 @@
private final MetricsCollector metricsCollector;
protected long timestamp;
private volatile boolean initialized;
+ private volatile boolean closing;
private volatile boolean closed;
- private AtomicLong syncedUpTo = new AtomicLong();
- private List<Entry> pendingWrites = Lists.newLinkedList();
+ private long writtenUpTo = 0L;
+ private volatile long syncedUpTo = 0L;
+ private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
private TransactionLogWriter writer;
- public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
+ private int countSinceLastSync = 0;
+ private long positionBeforeWrite = -1L;
+ private final Stopwatch stopWatch = new Stopwatch();
+
+ private final long slowAppendThreshold;
+
+ AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) {
this.timestamp = timestamp;
this.metricsCollector = metricsCollector;
+ this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD,
+ TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD);
}
/**
- * Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they
- * also call {@link HDFSTransactionLog#close()}.
+ * Initializes the log file, opening a file writer.
+ *
* @throws java.io.IOException If an error is encountered initializing the file writer.
*/
- public synchronized void init() throws IOException {
+ private synchronized void init() throws IOException {
if (initialized) {
return;
}
@@ -85,105 +100,147 @@
@Override
public void append(TransactionEdit edit) throws IOException {
- long startTime = System.nanoTime();
- synchronized (this) {
- ensureAvailable();
-
- Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
- // add to pending edits
- append(entry);
- }
-
- // wait for sync to complete
- sync();
- long durationMillis = (System.nanoTime() - startTime) / 1000000L;
- if (durationMillis > SLOW_APPEND_THRESHOLD) {
- LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
- }
+ append(Collections.singletonList(edit));
}
@Override
public void append(List<TransactionEdit> edits) throws IOException {
- long startTime = System.nanoTime();
- synchronized (this) {
- ensureAvailable();
-
- for (TransactionEdit edit : edits) {
- Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
- // add to pending edits
- append(entry);
- }
- }
-
- // wait for sync to complete
- sync();
- long durationMillis = (System.nanoTime() - startTime) / 1000000L;
- if (durationMillis > SLOW_APPEND_THRESHOLD) {
- LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
- }
- }
-
- private void ensureAvailable() throws IOException {
- if (closed) {
- throw new IOException("Log " + getName() + " is already closed, cannot append!");
+ if (closing) { // or closed, which implies closing
+ throw new IOException("Log " + getName() + " is closing or already closed, cannot append");
}
if (!initialized) {
init();
}
- }
-
- /*
- * Appends new writes to the pendingWrites. It is better to keep it in
- * our own queue rather than writing it to the HDFS output stream because
- * HDFSOutputStream.writeChunk is not lightweight at all.
- */
- private void append(Entry e) throws IOException {
- pendingWrites.add(e);
- }
-
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- private List<Entry> getPendingWrites() {
- synchronized (this) {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<>();
- return save;
+ // synchronizing here ensures that elements in the queue are ordered by seq number
+ synchronized (logSequence) {
+ for (TransactionEdit edit : edits) {
+ pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit));
+ }
}
+ // try to sync all pending edits (competing for this with other threads)
+ sync();
+ }
+
+ /**
+ * Return all pending writes at the time the method is called, or null if no writes are pending.
+ *
+ * Note that after this method returns, there can be additional pending writes,
+ * added concurrently while the existing pending writes are removed.
+ */
+ @Nullable
+ private Entry[] getPendingWrites() {
+ synchronized (this) {
+ if (pendingWrites.isEmpty()) {
+ return null;
+ }
+ Entry[] entriesToSync = new Entry[pendingWrites.size()];
+ for (int i = 0; i < entriesToSync.length; i++) {
+ entriesToSync[i] = pendingWrites.remove();
+ }
+ return entriesToSync;
+ }
+ }
+
+ /**
+ * When multiple threads try to log edits at the same time, they all will call (@link #append}
+ * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()}
+ * are followed by a single {@code sync}, or vice versa.
+ *
+ * We want to record the time and position of the first {@code append()} after a {@code sync()},
+ * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold.
+ * Therefore this is called every time before we write the pending list out to the log writer.
+ *
+ * See {@link #stopTimer(TransactionLogWriter)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
+ */
+ private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException {
+ // no sync needed because this is only called within a sync block
+ if (positionBeforeWrite == -1L) {
+ positionBeforeWrite = writer.getPosition();
+ countSinceLastSync = 0;
+ stopWatch.reset().start();
+ }
+ countSinceLastSync += entryCount;
+ }
+
+ /**
+ * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync
+ * together exceed a threshold.
+ *
+ * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
+ */
+ private void stopTimer(TransactionLogWriter writer) throws IOException {
+ // this method is only called by a thread if it actually called sync(), inside a sync block
+ if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
+ stopWatch.stop();
+ long elapsed = stopWatch.elapsedMillis();
+ long bytesWritten = writer.getPosition() - positionBeforeWrite;
+ if (elapsed >= slowAppendThreshold) {
+ LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
+ getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
+ }
+ metricsCollector.histogram("wal.sync.size", countSinceLastSync);
+ metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int
+ }
+ positionBeforeWrite = -1L;
+ countSinceLastSync = 0;
}
private void sync() throws IOException {
// writes out pending entries to the HLog
- TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
int entryCount = 0;
synchronized (this) {
if (closed) {
- return;
+ if (pendingWrites.isEmpty()) {
+ // this expected: close() sets closed to true after syncing all pending writes (including ours)
+ return;
+ }
+ // this should never happen because close() only sets closed=true after syncing.
+ // but if it should happen, we must fail this call because we don't know whether the edit was persisted
+ throw new IOException(
+ "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted");
}
- // prevent writer being dereferenced
- tmpWriter = writer;
-
- List<Entry> currentPending = getPendingWrites();
- if (!currentPending.isEmpty()) {
- tmpWriter.commitMarker(currentPending.size());
- }
-
- // write out all accumulated entries to log.
- for (Entry e : currentPending) {
- tmpWriter.append(e);
- entryCount++;
- latestSeq = Math.max(latestSeq, e.getKey().get());
+ Entry[] currentPending = getPendingWrites();
+ if (currentPending != null) {
+ entryCount = currentPending.length;
+ startTimerIfNeeded(writer, entryCount);
+ writer.commitMarker(entryCount);
+ for (Entry e : currentPending) {
+ writer.append(e);
+ }
+ // sequence are guaranteed to be ascending, so the last one is the greatest
+ latestSeq = currentPending[currentPending.length - 1].getKey().get();
+ writtenUpTo = latestSeq;
}
}
- long lastSynced = syncedUpTo.get();
+ // giving up the sync lock here allows other threads to write their edits before the sync happens.
+ // hence, we can have the edits from n threads in one sync.
+
// someone else might have already synced our edits, avoid double syncing
- if (lastSynced < latestSeq) {
- tmpWriter.sync();
- metricsCollector.histogram("wal.sync.size", entryCount);
- syncedUpTo.compareAndSet(lastSynced, latestSeq);
+ // Note: latestSeq is a local variable and syncedUpTo is volatile; hence this is safe without synchronization
+ if (syncedUpTo >= latestSeq) {
+ return;
+ }
+ synchronized (this) {
+ // check again - someone else might have synced our edits while we were waiting to synchronize
+ if (syncedUpTo >= latestSeq) {
+ return;
+ }
+ if (closed) {
+ // this should never happen because close() only sets closed=true after syncing.
+ // but if it should happen, we must fail this call because we don't know whether the edit was persisted
+ throw new IOException(String.format(
+ "Unexpected state: Writer is closed but there are unsynced edits up to sequence id %d, and writes have " +
+ "been synced up to sequence id %d. Cannot guarantee that edits are persisted.", latestSeq, syncedUpTo));
+ }
+ writer.sync();
+ syncedUpTo = writtenUpTo;
+ stopTimer(writer);
}
}
@@ -192,6 +249,9 @@
if (closed) {
return;
}
+ // prevent other threads from adding more edits to the pending queue
+ closing = true;
+
// perform a final sync if any outstanding writes
if (!pendingWrites.isEmpty()) {
sync();
@@ -251,20 +311,21 @@
}
// package private for testing
+ @SuppressWarnings("deprecation")
@Deprecated
@VisibleForTesting
static class CaskEntry implements Writable {
private LongWritable key;
private co.cask.tephra.persist.TransactionEdit edit;
-
// for Writable
+ @SuppressWarnings("unused")
public CaskEntry() {
this.key = new LongWritable();
this.edit = new co.cask.tephra.persist.TransactionEdit();
}
- public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
+ CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
this.key = key;
this.edit = edit;
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
index ba781ac..0d9b235 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
@@ -50,9 +50,9 @@
* @param hConf HDFS cluster configuration.
* @param logPath Path to the log file.
*/
- public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
- final Path logPath, long timestamp, MetricsCollector metricsCollector) {
- super(timestamp, metricsCollector);
+ HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
+ final Path logPath, long timestamp, MetricsCollector metricsCollector) {
+ super(timestamp, metricsCollector, hConf);
this.fs = fs;
this.hConf = hConf;
this.logPath = logPath;
@@ -73,7 +73,7 @@
FileStatus status = fs.getFileStatus(logPath);
long length = status.getLen();
- TransactionLogReader reader = null;
+ TransactionLogReader reader;
// check if this file needs to be recovered due to failure
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
@@ -82,38 +82,36 @@
LOG.warn("File " + logPath + " might be still open, length is 0");
}
+ HDFSUtil hdfsUtil = new HDFSUtil();
+ hdfsUtil.recoverFileLease(fs, logPath, hConf);
try {
- HDFSUtil hdfsUtil = new HDFSUtil();
- hdfsUtil.recoverFileLease(fs, logPath, hConf);
- try {
- FileStatus newStatus = fs.getFileStatus(logPath);
- LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
- SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
- reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
- } catch (EOFException e) {
- if (length <= 0) {
- // TODO should we ignore an empty, not-last log file if skip.errors
- // is false? Either way, the caller should decide what to do. E.g.
- // ignore if this is the last log in sequence.
- // TODO is this scenario still possible if the log has been
- // recovered (i.e. closed)
- LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
- return null;
- } else {
- // EOFException being ignored
- return null;
- }
+ FileStatus newStatus = fs.getFileStatus(logPath);
+ LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
+ SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
+ reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
+ } catch (EOFException e) {
+ if (length <= 0) {
+ // TODO should we ignore an empty, not-last log file if skip.errors
+ // is false? Either way, the caller should decide what to do. E.g.
+ // ignore if this is the last log in sequence.
+ // TODO is this scenario still possible if the log has been
+ // recovered (i.e. closed)
+ LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
+ return null;
+ } else {
+ // EOFException being ignored
+ return null;
}
- } catch (IOException e) {
- throw e;
}
return reader;
}
@VisibleForTesting
static final class LogWriter implements TransactionLogWriter {
+
private final SequenceFile.Writer internalWriter;
- public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
+
+ LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
// TODO: retry a few times to ride over transient failures?
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
@@ -125,6 +123,11 @@
}
@Override
+ public long getPosition() throws IOException {
+ return internalWriter.getLength();
+ }
+
+ @Override
public void append(Entry entry) throws IOException {
internalWriter.append(entry.getKey(), entry.getEdit());
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
index d81ba38..416aae7 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
@@ -18,6 +18,7 @@
package org.apache.tephra.persist;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.metrics.MetricsCollector;
import java.io.BufferedInputStream;
@@ -40,8 +41,8 @@
* Creates a new transaction log using the given file instance.
* @param logFile The log file to use.
*/
- public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
- super(timestamp, metricsCollector);
+ LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector, Configuration conf) {
+ super(timestamp, metricsCollector, conf);
this.logFile = logFile;
}
@@ -64,12 +65,17 @@
private final FileOutputStream fos;
private final DataOutputStream out;
- public LogWriter(File logFile) throws IOException {
+ LogWriter(File logFile) throws IOException {
this.fos = new FileOutputStream(logFile);
this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE));
}
@Override
+ public long getPosition() throws IOException {
+ return fos.getChannel().position();
+ }
+
+ @Override
public void append(Entry entry) throws IOException {
entry.write(out);
}
@@ -97,7 +103,7 @@
private final DataInputStream in;
private Entry reuseEntry = new Entry();
- public LogReader(File logFile) throws IOException {
+ LogReader(File logFile) throws IOException {
this.fin = new FileInputStream(logFile);
this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE));
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
index beddbb2..3edb909 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
@@ -61,6 +61,7 @@
};
private final String configuredSnapshotDir;
+ private final Configuration conf;
private final MetricsCollector metricsCollector;
private File snapshotDir;
@@ -69,6 +70,7 @@
MetricsCollector metricsCollector) {
super(codecProvider);
this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
+ this.conf = conf;
this.metricsCollector = metricsCollector;
}
@@ -220,7 +222,7 @@
@Nullable
@Override
public TransactionLog apply(@Nullable TimestampedFilename input) {
- return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
+ return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector, conf);
}
});
}
@@ -229,7 +231,7 @@
public TransactionLog createLog(long timestamp) throws IOException {
File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
- return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
+ return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector, conf);
}
@Override
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
index 14893ac..ec69548 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
@@ -43,6 +43,11 @@
void commitMarker(int count) throws IOException;
/**
+ * @return the current position in the output.
+ */
+ long getPosition() throws IOException;
+
+ /**
* Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)},
* but not yet flushed to durable storage.
*
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index 757b620..e295b95 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -29,7 +29,7 @@
import org.apache.tephra.distributed.RetryNTimes;
import org.apache.tephra.distributed.RetryStrategy;
import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.LocalFileTransactionStateStorage;
import org.apache.tephra.persist.TransactionStateStorage;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
@@ -52,6 +52,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ThriftTransactionSystemTest extends TransactionSystemTest {
@@ -79,6 +80,8 @@
// we want to use a retry strategy that lets us query the number of times it retried:
conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName());
conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2);
+ conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, tmpFolder.newFolder().toString());
Injector injector = Guice.createInjector(
new ConfigModule(conf),
@@ -88,7 +91,7 @@
.with(new AbstractModule() {
@Override
protected void configure() {
- bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+ bind(TransactionStateStorage.class).to(LocalFileTransactionStateStorage.class).in(Scopes.SINGLETON);
bind(TransactionService.class).to(TestTransactionService.class).in(Scopes.SINGLETON);
}
}),
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
index 7a34e55..f53264b 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -191,6 +191,7 @@
List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
long timestamp = System.currentTimeMillis();
Configuration configuration = getConfiguration();
+ configuration.set(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, "0");
FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
AtomicLong logSequence = new AtomicLong();