(TEPHRA-243) Improve logging for slow log append and fix concurrency issues in transaction log writer

This closes #53 from Github.

Signed-off-by: anew <anew@apache.org>
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 3a6b70a..bb4b139 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -384,6 +384,15 @@
     public static final String NUM_ENTRIES_APPENDED = "count";
     public static final String VERSION_KEY = "version";
     public static final byte CURRENT_VERSION = 3;
+
+    /**
+     * Time limit, in milliseconds, of an append to the transaction log before we log it as "slow".
+     */
+    public static final String CFG_SLOW_APPEND_THRESHOLD = "data.tx.log.slow.append.threshold";
+    /**
+     * Default value for the threshold in milli seconds for slow log append warnings.
+     */
+    public static final long DEFAULT_SLOW_APPEND_THRESHOLD = 1000;
   }
 
   /**
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
index cf97c92..eba5a1f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -19,9 +19,11 @@
 package org.apache.tephra.persist;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
+import com.google.common.base.Stopwatch;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.tephra.TxConstants;
 import org.apache.tephra.metrics.MetricsCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,17 +31,20 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
 
 /**
  * Common implementation of a transaction log, backed by file reader and writer based storage.  Classes extending
  * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
+ *
+ * It is important to call close() on this class to ensure that all writes are synced and the log files are closed.
  */
 public abstract class AbstractTransactionLog implements TransactionLog {
-  /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
-  private static final long SLOW_APPEND_THRESHOLD = 1000L;
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
 
@@ -47,22 +52,32 @@
   private final MetricsCollector metricsCollector;
   protected long timestamp;
   private volatile boolean initialized;
+  private volatile boolean closing;
   private volatile boolean closed;
-  private AtomicLong syncedUpTo = new AtomicLong();
-  private List<Entry> pendingWrites = Lists.newLinkedList();
+  private long writtenUpTo = 0L;
+  private volatile long syncedUpTo = 0L;
+  private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>();
   private TransactionLogWriter writer;
 
-  public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
+  private int countSinceLastSync = 0;
+  private long positionBeforeWrite = -1L;
+  private final Stopwatch stopWatch = new Stopwatch();
+
+  private final long slowAppendThreshold;
+
+  AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) {
     this.timestamp = timestamp;
     this.metricsCollector = metricsCollector;
+    this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD,
+                                            TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD);
   }
 
   /**
-   * Initializes the log file, opening a file writer.  Clients calling {@code init()} should ensure that they
-   * also call {@link HDFSTransactionLog#close()}.
+   * Initializes the log file, opening a file writer.
+   *
    * @throws java.io.IOException If an error is encountered initializing the file writer.
    */
-  public synchronized void init() throws IOException {
+  private synchronized void init() throws IOException {
     if (initialized) {
       return;
     }
@@ -85,105 +100,147 @@
 
   @Override
   public void append(TransactionEdit edit) throws IOException {
-    long startTime = System.nanoTime();
-    synchronized (this) {
-      ensureAvailable();
-
-      Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
-      // add to pending edits
-      append(entry);
-    }
-
-    // wait for sync to complete
-    sync();
-    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
-    if (durationMillis > SLOW_APPEND_THRESHOLD) {
-      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
-    }
+    append(Collections.singletonList(edit));
   }
 
   @Override
   public void append(List<TransactionEdit> edits) throws IOException {
-    long startTime = System.nanoTime();
-    synchronized (this) {
-      ensureAvailable();
-
-      for (TransactionEdit edit : edits) {
-        Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
-
-        // add to pending edits
-        append(entry);
-      }
-    }
-
-    // wait for sync to complete
-    sync();
-    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
-    if (durationMillis > SLOW_APPEND_THRESHOLD) {
-      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
-    }
-  }
-
-  private void ensureAvailable() throws IOException {
-    if (closed) {
-      throw new IOException("Log " + getName() + " is already closed, cannot append!");
+    if (closing) { // or closed, which implies closing
+      throw new IOException("Log " + getName() + " is closing or already closed, cannot append");
     }
     if (!initialized) {
       init();
     }
-  }
-
-  /*
-   * Appends new writes to the pendingWrites. It is better to keep it in
-   * our own queue rather than writing it to the HDFS output stream because
-   * HDFSOutputStream.writeChunk is not lightweight at all.
-   */
-  private void append(Entry e) throws IOException {
-    pendingWrites.add(e);
-  }
-
-  // Returns all currently pending writes. New writes
-  // will accumulate in a new list.
-  private List<Entry> getPendingWrites() {
-    synchronized (this) {
-      List<Entry> save = this.pendingWrites;
-      this.pendingWrites = new LinkedList<>();
-      return save;
+    // synchronizing here ensures that elements in the queue are ordered by seq number
+    synchronized (logSequence) {
+      for (TransactionEdit edit : edits) {
+        pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit));
+      }
     }
+    // try to sync all pending edits (competing for this with other threads)
+    sync();
+  }
+
+  /**
+   * Return all pending writes at the time the method is called, or null if no writes are pending.
+   *
+   * Note that after this method returns, there can be additional pending writes,
+   * added concurrently while the existing pending writes are removed.
+   */
+  @Nullable
+  private Entry[] getPendingWrites() {
+    synchronized (this) {
+      if (pendingWrites.isEmpty()) {
+        return null;
+      }
+      Entry[] entriesToSync = new Entry[pendingWrites.size()];
+      for (int i = 0; i < entriesToSync.length; i++) {
+        entriesToSync[i] = pendingWrites.remove();
+      }
+      return entriesToSync;
+    }
+  }
+
+  /**
+   * When multiple threads try to log edits at the same time, they all will call (@link #append}
+   * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()}
+   * are followed by a single {@code sync}, or vice versa.
+   *
+   * We want to record the time and position of the first {@code append()} after a {@code sync()},
+   * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold.
+   * Therefore this is called every time before we write the pending list out to the log writer.
+   *
+   * See {@link #stopTimer(TransactionLogWriter)}.
+   *
+   * @throws IOException if the position of the writer cannot be determined
+   */
+  private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException {
+    // no sync needed because this is only called within a sync block
+    if (positionBeforeWrite == -1L) {
+      positionBeforeWrite = writer.getPosition();
+      countSinceLastSync = 0;
+      stopWatch.reset().start();
+    }
+    countSinceLastSync += entryCount;
+  }
+
+  /**
+   * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync
+   * together exceed a threshold.
+   *
+   * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+   *
+   * @throws IOException if the position of the writer cannot be determined
+   */
+  private void stopTimer(TransactionLogWriter writer) throws IOException {
+    // this method is only called by a thread if it actually called sync(), inside a sync block
+    if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case
+      stopWatch.stop();
+      long elapsed = stopWatch.elapsedMillis();
+      long bytesWritten = writer.getPosition() - positionBeforeWrite;
+      if (elapsed >= slowAppendThreshold) {
+        LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
+                 getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
+      }
+      metricsCollector.histogram("wal.sync.size", countSinceLastSync);
+      metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int
+    }
+    positionBeforeWrite = -1L;
+    countSinceLastSync = 0;
   }
 
   private void sync() throws IOException {
     // writes out pending entries to the HLog
-    TransactionLogWriter tmpWriter = null;
     long latestSeq = 0;
     int entryCount = 0;
     synchronized (this) {
       if (closed) {
-        return;
+        if (pendingWrites.isEmpty()) {
+          // this expected: close() sets closed to true after syncing all pending writes (including ours)
+          return;
+        }
+        // this should never happen because close() only sets closed=true after syncing.
+        // but if it should happen, we must fail this call because we don't know whether the edit was persisted
+        throw new IOException(
+          "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted");
       }
-      // prevent writer being dereferenced
-      tmpWriter = writer;
-
-      List<Entry> currentPending = getPendingWrites();
-      if (!currentPending.isEmpty()) {
-        tmpWriter.commitMarker(currentPending.size());
-      }
-
-      // write out all accumulated entries to log.
-      for (Entry e : currentPending) {
-        tmpWriter.append(e);
-        entryCount++;
-        latestSeq = Math.max(latestSeq, e.getKey().get());
+      Entry[] currentPending = getPendingWrites();
+      if (currentPending != null) {
+        entryCount = currentPending.length;
+        startTimerIfNeeded(writer, entryCount);
+        writer.commitMarker(entryCount);
+        for (Entry e : currentPending) {
+          writer.append(e);
+        }
+        // sequence are guaranteed to be ascending, so the last one is the greatest
+        latestSeq = currentPending[currentPending.length - 1].getKey().get();
+        writtenUpTo = latestSeq;
       }
     }
 
-    long lastSynced = syncedUpTo.get();
+    // giving up the sync lock here allows other threads to write their edits before the sync happens.
+    // hence, we can have the edits from n threads in one sync.
+
     // someone else might have already synced our edits, avoid double syncing
-    if (lastSynced < latestSeq) {
-      tmpWriter.sync();
-      metricsCollector.histogram("wal.sync.size", entryCount);
-      syncedUpTo.compareAndSet(lastSynced, latestSeq);
+    // Note: latestSeq is a local variable and syncedUpTo is volatile; hence this is safe without synchronization
+    if (syncedUpTo >= latestSeq) {
+      return;
+    }
+    synchronized (this) {
+      // check again - someone else might have  synced our edits while we were waiting to synchronize
+      if (syncedUpTo >= latestSeq) {
+        return;
+      }
+      if (closed) {
+        // this should never happen because close() only sets closed=true after syncing.
+        // but if it should happen, we must fail this call because we don't know whether the edit was persisted
+        throw new IOException(String.format(
+          "Unexpected state: Writer is closed but there are unsynced edits up to sequence id %d, and writes have " +
+            "been synced up to sequence id %d. Cannot guarantee that edits are persisted.", latestSeq, syncedUpTo));
+      }
+      writer.sync();
+      syncedUpTo = writtenUpTo;
+      stopTimer(writer);
     }
   }
 
@@ -192,6 +249,9 @@
     if (closed) {
       return;
     }
+    // prevent other threads from adding more edits to the pending queue
+    closing = true;
+
     // perform a final sync if any outstanding writes
     if (!pendingWrites.isEmpty()) {
       sync();
@@ -251,20 +311,21 @@
   }
 
   // package private for testing
+  @SuppressWarnings("deprecation")
   @Deprecated
   @VisibleForTesting
   static class CaskEntry implements Writable {
     private LongWritable key;
     private co.cask.tephra.persist.TransactionEdit edit;
 
-
     // for Writable
+    @SuppressWarnings("unused")
     public CaskEntry() {
       this.key = new LongWritable();
       this.edit = new co.cask.tephra.persist.TransactionEdit();
     }
 
-    public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
+    CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
       this.key = key;
       this.edit = edit;
     }
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
index ba781ac..0d9b235 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java
@@ -50,9 +50,9 @@
    * @param hConf HDFS cluster configuration.
    * @param logPath Path to the log file.
    */
-  public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
-                            final Path logPath, long timestamp, MetricsCollector metricsCollector) {
-    super(timestamp, metricsCollector);
+  HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
+                     final Path logPath, long timestamp, MetricsCollector metricsCollector) {
+    super(timestamp, metricsCollector, hConf);
     this.fs = fs;
     this.hConf = hConf;
     this.logPath = logPath;
@@ -73,7 +73,7 @@
     FileStatus status = fs.getFileStatus(logPath);
     long length = status.getLen();
 
-    TransactionLogReader reader = null;
+    TransactionLogReader reader;
     // check if this file needs to be recovered due to failure
     // Check for possibly empty file. With appends, currently Hadoop reports a
     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
@@ -82,38 +82,36 @@
       LOG.warn("File " + logPath + " might be still open, length is 0");
     }
 
+    HDFSUtil hdfsUtil = new HDFSUtil();
+    hdfsUtil.recoverFileLease(fs, logPath, hConf);
     try {
-      HDFSUtil hdfsUtil = new HDFSUtil();
-      hdfsUtil.recoverFileLease(fs, logPath, hConf);
-      try {
-        FileStatus newStatus = fs.getFileStatus(logPath);
-        LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
-        SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
-        reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
-      } catch (EOFException e) {
-        if (length <= 0) {
-          // TODO should we ignore an empty, not-last log file if skip.errors
-          // is false? Either way, the caller should decide what to do. E.g.
-          // ignore if this is the last log in sequence.
-          // TODO is this scenario still possible if the log has been
-          // recovered (i.e. closed)
-          LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
-          return null;
-        } else {
-          // EOFException being ignored
-          return null;
-        }
+      FileStatus newStatus = fs.getFileStatus(logPath);
+      LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
+      SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
+      reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
+    } catch (EOFException e) {
+      if (length <= 0) {
+        // TODO should we ignore an empty, not-last log file if skip.errors
+        // is false? Either way, the caller should decide what to do. E.g.
+        // ignore if this is the last log in sequence.
+        // TODO is this scenario still possible if the log has been
+        // recovered (i.e. closed)
+        LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
+        return null;
+      } else {
+        // EOFException being ignored
+        return null;
       }
-    } catch (IOException e) {
-      throw e;
     }
     return reader;
   }
 
   @VisibleForTesting
   static final class LogWriter implements TransactionLogWriter {
+
     private final SequenceFile.Writer internalWriter;
-    public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
+
+    LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException {
       // TODO: retry a few times to ride over transient failures?
       SequenceFile.Metadata metadata = new SequenceFile.Metadata();
       metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
@@ -125,6 +123,11 @@
     }
 
     @Override
+    public long getPosition() throws IOException {
+      return internalWriter.getLength();
+    }
+
+    @Override
     public void append(Entry entry) throws IOException {
       internalWriter.append(entry.getKey(), entry.getEdit());
     }
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
index d81ba38..416aae7 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java
@@ -18,6 +18,7 @@
 
 package org.apache.tephra.persist;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.metrics.MetricsCollector;
 
 import java.io.BufferedInputStream;
@@ -40,8 +41,8 @@
    * Creates a new transaction log using the given file instance.
    * @param logFile The log file to use.
    */
-  public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
-    super(timestamp, metricsCollector);
+  LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector, Configuration conf) {
+    super(timestamp, metricsCollector, conf);
     this.logFile = logFile;
   }
 
@@ -64,12 +65,17 @@
     private final FileOutputStream fos;
     private final DataOutputStream out;
 
-    public LogWriter(File logFile) throws IOException {
+    LogWriter(File logFile) throws IOException {
       this.fos = new FileOutputStream(logFile);
       this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE));
     }
 
     @Override
+    public long getPosition() throws IOException {
+      return fos.getChannel().position();
+    }
+
+    @Override
     public void append(Entry entry) throws IOException {
       entry.write(out);
     }
@@ -97,7 +103,7 @@
     private final DataInputStream in;
     private Entry reuseEntry = new Entry();
 
-    public LogReader(File logFile) throws IOException {
+    LogReader(File logFile) throws IOException {
       this.fin = new FileInputStream(logFile);
       this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE));
     }
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
index beddbb2..3edb909 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java
@@ -61,6 +61,7 @@
   };
 
   private final String configuredSnapshotDir;
+  private final Configuration conf;
   private final MetricsCollector metricsCollector;
   private File snapshotDir;
 
@@ -69,6 +70,7 @@
                                           MetricsCollector metricsCollector) {
     super(codecProvider);
     this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
+    this.conf = conf;
     this.metricsCollector = metricsCollector;
   }
 
@@ -220,7 +222,7 @@
       @Nullable
       @Override
       public TransactionLog apply(@Nullable TimestampedFilename input) {
-        return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
+        return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector, conf);
       }
     });
   }
@@ -229,7 +231,7 @@
   public TransactionLog createLog(long timestamp) throws IOException {
     File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
     LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
-    return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
+    return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector, conf);
   }
 
   @Override
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
index 14893ac..ec69548 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java
@@ -43,6 +43,11 @@
   void commitMarker(int count) throws IOException;
 
   /**
+   * @return the current position in the output.
+   */
+  long getPosition() throws IOException;
+
+  /**
    * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)},
    * but not yet flushed to durable storage.
    *
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index 757b620..e295b95 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -29,7 +29,7 @@
 import org.apache.tephra.distributed.RetryNTimes;
 import org.apache.tephra.distributed.RetryStrategy;
 import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.LocalFileTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.apache.tephra.runtime.ConfigModule;
 import org.apache.tephra.runtime.DiscoveryModules;
@@ -52,6 +52,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ThriftTransactionSystemTest extends TransactionSystemTest {
@@ -79,6 +80,8 @@
     // we want to use a retry strategy that lets us query the number of times it retried:
     conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName());
     conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2);
+    conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, tmpFolder.newFolder().toString());
 
     Injector injector = Guice.createInjector(
       new ConfigModule(conf),
@@ -88,7 +91,7 @@
         .with(new AbstractModule() {
           @Override
           protected void configure() {
-            bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+            bind(TransactionStateStorage.class).to(LocalFileTransactionStateStorage.class).in(Scopes.SINGLETON);
             bind(TransactionService.class).to(TestTransactionService.class).in(Scopes.SINGLETON);
           }
         }),
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
index 7a34e55..f53264b 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -191,6 +191,7 @@
     List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
     long timestamp = System.currentTimeMillis();
     Configuration configuration = getConfiguration();
+    configuration.set(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, "0");
     FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
     SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
     AtomicLong logSequence = new AtomicLong();