Bound memory utilization for dynamic partitioning (i.e. memory growth is constant) (#11294)

* Bound memory in native batch ingest create segments

* Move BatchAppenderatorDriverTest to indexing service... note that we had to put the sink back in sinks in mergeandpush since the persistent data needs to be dropped and the sink is required for that

* Remove sinks from memory and clean up intermediate persists dirs manually after sink has been merged

* Changed name from RealtimeAppenderator to StreamAppenderator

* Style

* Incorporating tests from StreamAppenderatorTest

* Keep totalRows and cleanup code

* Added missing dep

* Fix unit test

* Checkstyle

* allowIncrementalPersists should always be true for batch

* Added sinks metadata

* clear sinks metadata when closing appenderator

* Style + minor edits to log msgs

* Update sinks metadata & totalRows when dropping a sink (segment)

* Remove max

* Intelli-j check

* Keep a count of hydrants persisted by sink for sanity check before merge

* Move out sanity

* Add previous hydrant count to sink metadata

* Remove redundant field from SinkMetadata

* Remove unneeded functions

* Cleanup unused code

* Removed unused code

* Remove unused field

* Exclude it from jacoco because it is very hard to get branch coverage

* Remove segment announcement and some other minor cleanup

* Add fallback flag

* Minor code cleanup

* Checkstyle

* Code review changes

* Update batchMemoryMappedIndex name

* Code review comments

* Exclude class from coverage, will include again when packaging gets fixed

* Moved test classes to server module

* More BatchAppenderator cleanup

* Fix bug in wrong counting of totalHydrants plus minor cleanup in add

* Removed left over comments

* Have BatchAppenderator follow the Appenderator contract for push & getSegments

* Fix LGTM violations

* Review comments

* Add stats after push is done

* Code review comments (cleanup, remove rest of synchronization constructs in batch appenderator, reneame feature flag,
remove real time flag stuff from stream appenderator, etc.)

* Update javadocs

* Add thread safety notice to BatchAppenderator

* Further cleanup config

* More config cleanup
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index a5e9d9a..f010792 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1334,7 +1334,7 @@
 |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
 |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
 |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
-|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
+|`druid.indexer.task.useLegacyBatchProcessing`|If false, native batch ingestion will use a new, recommended, code path with memory optimized code for the segment creation phase. If true it will use the previous code path for the create segments phase of batch ingestion. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
 |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
 |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
 |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5ef9fa6..a711d98 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -63,11 +63,6 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-collections4</artifactId>
-            <version>4.2</version>
-        </dependency>
-        <dependency>
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-core</artifactId>
         </dependency>
@@ -232,7 +227,11 @@
             <artifactId>jackson-core-asl</artifactId>
             <scope>provided</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <!-- Tests -->
         <dependency>
             <groupId>junit</groupId>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 0285b33..27e2c55 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -77,7 +77,7 @@
   private final boolean ignoreTimestampSpecForDruidInputSource;
 
   @JsonProperty
-  private final boolean batchMemoryMappedIndex;
+  private final boolean useLegacyBatchProcessing;
 
   @JsonCreator
   public TaskConfig(
@@ -91,7 +91,7 @@
       @JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
       @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
       @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
-      @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior
+      @JsonProperty("useLegacyBatchProcessing") boolean useLegacyBatchProcessing // only set to true to fall back to older behavior
   )
   {
     this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -117,7 +117,7 @@
       this.shuffleDataLocations = shuffleDataLocations;
     }
     this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
-    this.batchMemoryMappedIndex = batchMemoryMapIndex;
+    this.useLegacyBatchProcessing = useLegacyBatchProcessing;
   }
 
   @JsonProperty
@@ -201,9 +201,9 @@
   }
 
   @JsonProperty
-  public boolean getBatchMemoryMappedIndex()
+  public boolean getuseLegacyBatchProcessing()
   {
-    return batchMemoryMappedIndex;
+    return useLegacyBatchProcessing;
   }
 
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index bff138f..1a43706 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -81,7 +81,7 @@
         toolbox.getIndexMergerV9(),
         rowIngestionMeters,
         parseExceptionHandler,
-        toolbox.getConfig().getBatchMemoryMappedIndex()
+        toolbox.getConfig().getuseLegacyBatchProcessing()
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
deleted file mode 100644
index e441763..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.appenderator;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class BatchAppenderatorTest extends InitializedNullHandlingTest
-{
-  private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
-      createSegmentId("2000/2001", "A", 0),
-      createSegmentId("2000/2001", "A", 1),
-      createSegmentId("2001/2002", "A", 0)
-  );
-
-  @Test
-  public void testSimpleIngestionWithIndexesNotMapped() throws Exception
-  {
-    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
-                                                                            false,
-                                                                            false)) {
-      final Appenderator appenderator = tester.getAppenderator();
-      boolean thrown;
-
-      // startJob
-      Assert.assertEquals(null, appenderator.startJob());
-
-      // getDataSource
-      Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
-
-      // add
-      Assert.assertEquals(
-          1,
-          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
-                      .getNumRowsInSegment()
-      );
-
-      Assert.assertEquals(
-          2,
-          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
-                      .getNumRowsInSegment()
-      );
-
-      Assert.assertEquals(
-          1,
-          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
-                      .getNumRowsInSegment()
-      );
-
-      // getSegments
-      Assert.assertEquals(IDENTIFIERS.subList(0, 2),
-                          appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
-
-      // getRowCount
-      Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
-      Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
-      thrown = false;
-      try {
-        appenderator.getRowCount(IDENTIFIERS.get(2));
-      }
-      catch (IllegalStateException e) {
-        thrown = true;
-      }
-      Assert.assertTrue(thrown);
-
-      // push all
-      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
-          appenderator.getSegments(),
-          null,
-          false
-      ).get();
-      Assert.assertEquals(
-          IDENTIFIERS.subList(0, 2),
-              Lists.transform(
-                  segmentsAndCommitMetadata.getSegments(),
-                  new Function<DataSegment, SegmentIdWithShardSpec>()
-                  {
-                    @Override
-                    public SegmentIdWithShardSpec apply(DataSegment input)
-                    {
-                      return SegmentIdWithShardSpec.fromDataSegment(input);
-                    }
-                  }
-              ).stream().sorted().collect(Collectors.toList())
-      );
-      Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
-                          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
-
-      appenderator.clear();
-      Assert.assertTrue(appenderator.getSegments().isEmpty());
-    }
-  }
-
-  @Test
-  public void testSimpleIngestionWithIndexesMapped() throws Exception
-  {
-    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
-                                                                            false,
-                                                                            true)) {
-      final Appenderator appenderator = tester.getAppenderator();
-      boolean thrown;
-
-      // startJob
-      Assert.assertEquals(null, appenderator.startJob());
-
-      // getDataSource
-      Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
-
-      // add
-      Assert.assertEquals(
-          1,
-          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
-                      .getNumRowsInSegment()
-      );
-
-      Assert.assertEquals(
-          2,
-          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
-                      .getNumRowsInSegment()
-      );
-
-      Assert.assertEquals(
-          1,
-          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
-                      .getNumRowsInSegment()
-      );
-
-      // getSegments
-      Assert.assertEquals(IDENTIFIERS.subList(0, 2),
-                          appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
-
-      // getRowCount
-      Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
-      Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
-      thrown = false;
-      try {
-        appenderator.getRowCount(IDENTIFIERS.get(2));
-      }
-      catch (IllegalStateException e) {
-        thrown = true;
-      }
-      Assert.assertTrue(thrown);
-
-      // push all
-      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
-          appenderator.getSegments(),
-          null,
-          false
-      ).get();
-      Assert.assertEquals(
-          IDENTIFIERS.subList(0, 2),
-          Lists.transform(
-              segmentsAndCommitMetadata.getSegments(),
-              new Function<DataSegment, SegmentIdWithShardSpec>()
-              {
-                @Override
-                public SegmentIdWithShardSpec apply(DataSegment input)
-                {
-                  return SegmentIdWithShardSpec.fromDataSegment(input);
-                }
-              }
-          ).stream().sorted().collect(Collectors.toList())
-      );
-      Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
-                          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
-
-      appenderator.clear();
-      Assert.assertTrue(appenderator.getSegments().isEmpty());
-    }
-  }
-  private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
-  {
-    return new SegmentIdWithShardSpec(
-        AppenderatorTester.DATASOURCE,
-        Intervals.of(interval),
-        version,
-        new LinearShardSpec(partitionNum)
-
-    );
-  }
-
-  static InputRow createInputRow(String ts, String dim, Object met)
-  {
-    return new MapBasedInputRow(
-        DateTimes.of(ts).getMillis(),
-        ImmutableList.of("dim"),
-        ImmutableMap.of(
-            "dim",
-            dim,
-            "met",
-            met
-        )
-    );
-  }
-
-
-}
-
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
deleted file mode 100644
index 4058aff..0000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.appenderator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JSONParseSpec;
-import org.apache.druid.data.input.impl.MapInputRowParser;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexing.common.task.IndexTask;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.IndexMerger;
-import org.apache.druid.segment.IndexMergerV9;
-import org.apache.druid.segment.column.ColumnConfig;
-import org.apache.druid.segment.incremental.ParseExceptionHandler;
-import org.apache.druid.segment.incremental.RowIngestionMeters;
-import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.Appenderators;
-import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class BatchAppenderatorTester implements AutoCloseable
-{
-  public static final String DATASOURCE = "foo";
-
-  private final DataSchema schema;
-  private final IndexTask.IndexTuningConfig tuningConfig;
-  private final FireDepartmentMetrics metrics;
-  private final DataSegmentPusher dataSegmentPusher;
-  private final ObjectMapper objectMapper;
-  private final Appenderator appenderator;
-  private final IndexIO indexIO;
-  private final IndexMerger indexMerger;
-  private final ServiceEmitter emitter;
-
-  private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
-
-  public BatchAppenderatorTester(
-      final int maxRowsInMemory,
-      final boolean enablePushFailure,
-      boolean batchMemoryMappedIndex
-  )
-  {
-    this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex);
-  }
-
-  public BatchAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final File basePersistDirectory,
-      final boolean enablePushFailure,
-      boolean batchMemoryMappedIndex
-  )
-  {
-    this(
-        maxRowsInMemory,
-        maxSizeInBytes,
-        basePersistDirectory,
-        enablePushFailure,
-        new SimpleRowIngestionMeters(),
-        false,
-        batchMemoryMappedIndex
-    );
-  }
-
-  public BatchAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final File basePersistDirectory,
-      final boolean enablePushFailure,
-      final RowIngestionMeters rowIngestionMeters,
-      final boolean skipBytesInMemoryOverheadCheck,
-      boolean batchMemoryMappedIndex
-  )
-  {
-    objectMapper = new DefaultObjectMapper();
-    objectMapper.registerSubtypes(LinearShardSpec.class);
-
-    final Map<String, Object> parserMap = objectMapper.convertValue(
-        new MapInputRowParser(
-            new JSONParseSpec(
-                new TimestampSpec("ts", "auto", null),
-                new DimensionsSpec(null, null, null),
-                null,
-                null,
-                null
-            )
-        ),
-        Map.class
-    );
-    schema = new DataSchema(
-        DATASOURCE,
-        parserMap,
-        new AggregatorFactory[]{
-            new CountAggregatorFactory("count"),
-            new LongSumAggregatorFactory("met", "met")
-        },
-        new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
-        null,
-        objectMapper
-    );
-    tuningConfig = new IndexTask.IndexTuningConfig(
-        null,
-        2,
-        null,
-        maxRowsInMemory,
-        maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
-        false,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
-        true,
-        null,
-        null,
-        null,
-        null
-    ).withBasePersistDirectory(createNewBasePersistDirectory());
-
-    metrics = new FireDepartmentMetrics();
-
-    indexIO = new IndexIO(
-        objectMapper,
-        new ColumnConfig()
-        {
-          @Override
-          public int columnCacheSizeBytes()
-          {
-            return 0;
-          }
-        }
-    );
-    indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
-
-    emitter = new ServiceEmitter(
-        "test",
-        "test",
-        new NoopEmitter()
-    );
-    emitter.start();
-    EmittingLogger.registerEmitter(emitter);
-    dataSegmentPusher = new DataSegmentPusher()
-    {
-      private boolean mustFail = true;
-
-      @Deprecated
-      @Override
-      public String getPathForHadoop(String dataSource)
-      {
-        return getPathForHadoop();
-      }
-
-      @Override
-      public String getPathForHadoop()
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
-      {
-        if (enablePushFailure && mustFail) {
-          mustFail = false;
-          throw new IOException("Push failure test");
-        } else if (enablePushFailure) {
-          mustFail = true;
-        }
-        pushedSegments.add(segment);
-        return segment;
-      }
-
-      @Override
-      public Map<String, Object> makeLoadSpec(URI uri)
-      {
-        throw new UnsupportedOperationException();
-      }
-    };
-    appenderator = Appenderators.createOffline(
-        schema.getDataSource(),
-        schema,
-        tuningConfig,
-        metrics,
-        dataSegmentPusher,
-        objectMapper,
-        indexIO,
-        indexMerger,
-        rowIngestionMeters,
-        new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
-        batchMemoryMappedIndex
-    );
-  }
-
-  private long getDefaultMaxBytesInMemory()
-  {
-    return (Runtime.getRuntime().totalMemory()) / 3;
-  }
-
-  public DataSchema getSchema()
-  {
-    return schema;
-  }
-
-  public IndexTask.IndexTuningConfig getTuningConfig()
-  {
-    return tuningConfig;
-  }
-
-  public FireDepartmentMetrics getMetrics()
-  {
-    return metrics;
-  }
-
-  public DataSegmentPusher getDataSegmentPusher()
-  {
-    return dataSegmentPusher;
-  }
-
-  public ObjectMapper getObjectMapper()
-  {
-    return objectMapper;
-  }
-
-  public Appenderator getAppenderator()
-  {
-    return appenderator;
-  }
-
-  public List<DataSegment> getPushedSegments()
-  {
-    return pushedSegments;
-  }
-
-  @Override
-  public void close() throws Exception
-  {
-    appenderator.close();
-    emitter.close();
-    FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
-  }
-
-  private static File createNewBasePersistDirectory()
-  {
-    return FileUtils.createTempDir("druid-batch-persist");
-  }
-}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index a205751..225769e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -105,7 +105,7 @@
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      boolean batchMemoryMappedIndex
+      boolean useLegacyBatchProcessing
   )
   {
     return Appenderators.createOffline(
@@ -119,7 +119,7 @@
         indexMerger,
         rowIngestionMeters,
         parseExceptionHandler,
-        batchMemoryMappedIndex
+        useLegacyBatchProcessing
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index e2f226a..6070013 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -79,7 +79,7 @@
 import org.apache.druid.segment.column.DictionaryEncodedColumn;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CompressionUtils;
 import org.assertj.core.api.Assertions;
@@ -461,7 +461,7 @@
   protected void unlockAppenderatorBasePersistDirForTask(SeekableStreamIndexTask task)
       throws NoSuchMethodException, InvocationTargetException, IllegalAccessException
   {
-    Method unlockBasePersistDir = ((AppenderatorImpl) task.getAppenderator())
+    Method unlockBasePersistDir = ((StreamAppenderator) task.getAppenderator())
         .getClass()
         .getDeclaredMethod("unlockBasePersistDirectory");
     unlockBasePersistDir.setAccessible(true);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 1d7da70..26d6f58 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -43,6 +43,12 @@
  * Concurrency: all methods defined in this class directly, including {@link #close()} and {@link #closeNow()}, i. e.
  * all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread.
  * Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads.
+ *<p>
+ * Important note: For historical reasons there was a single implementation for this interface ({@code AppenderatorImpl})
+ * but that since has been split into two classes: {@link StreamAppenderator} and {@link BatchAppenderator}. With this change
+ * all the query support & concurrency has been removed from the {@code BatchAppenderator} therefore this class no longer
+ * makes sense to have as an {@code Appenderator}. In the future we may want to refactor away the {@code Appenderator}
+ * interface from {@code BatchAppenderator}.
  */
 public interface Appenderator extends QuerySegmentWalker
 {
@@ -215,15 +221,6 @@
   void closeNow();
 
   /**
-   * Flag to tell internals whether appenderator is working on behalf of a real time task.
-   * This is to manage certain aspects as needed. For example, for batch, non-real time tasks,
-   * physical segments (i.e. hydrants) do not need to memory map their persisted
-   * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance
-   * of OOMs.
-   */
-  boolean isRealTime();
-
-  /**
    * Result of {@link Appenderator#add} containing following information
    * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added
    * - int - positive number indicating how many summarized rows exist in this segment so far and
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 6a86f3e..743df6b 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -62,7 +62,7 @@
       ParseExceptionHandler parseExceptionHandler
   )
   {
-    return new AppenderatorImpl(
+    return new StreamAppenderator(
         id,
         schema,
         config,
@@ -88,8 +88,7 @@
         indexMerger,
         cache,
         rowIngestionMeters,
-        parseExceptionHandler,
-        true
+        parseExceptionHandler
     );
   }
 
@@ -104,24 +103,40 @@
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      boolean batchMemoryMappedIndex
+      boolean useLegacyBatchProcessing
   )
   {
-    return new AppenderatorImpl(
+    if (useLegacyBatchProcessing) {
+      // fallback to code known to be working, this is just a fallback option in case new
+      // batch appenderator has some early bugs but we will remove this fallback as soon as
+      // we determine that batch appenderator code is stable
+      return new StreamAppenderator(
+          id,
+          schema,
+          config,
+          metrics,
+          dataSegmentPusher,
+          objectMapper,
+          new NoopDataSegmentAnnouncer(),
+          null,
+          indexIO,
+          indexMerger,
+          null,
+          rowIngestionMeters,
+          parseExceptionHandler
+      );
+    }
+    return new BatchAppenderator(
         id,
         schema,
         config,
         metrics,
         dataSegmentPusher,
         objectMapper,
-        new NoopDataSegmentAnnouncer(),
-        null,
         indexIO,
         indexMerger,
-        null,
         rowIngestionMeters,
-        parseExceptionHandler,
-        batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code
+        parseExceptionHandler
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 0bbdd40..3e25bc5 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -98,7 +98,7 @@
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      boolean batchMemoryMappedIndex
+      boolean useLegacyBatchProcessing
   );
 
   /**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 1a96f4e..bdd572c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -172,7 +172,7 @@
   /**
    * Allocated segments for a sequence
    */
-  static class SegmentsForSequence
+  public static class SegmentsForSequence
   {
     // Interval Start millis -> List of Segments for this interval
     // there might be multiple segments for a start interval, for example one segment
@@ -215,7 +215,7 @@
       return intervalToSegmentStates.get(timestamp);
     }
 
-    Stream<SegmentWithState> allSegmentStateStream()
+    public Stream<SegmentWithState> allSegmentStateStream()
     {
       return intervalToSegmentStates
           .values()
@@ -261,7 +261,7 @@
   }
 
   @VisibleForTesting
-  Map<String, SegmentsForSequence> getSegments()
+  public Map<String, SegmentsForSequence> getSegments()
   {
     return segments;
   }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
new file mode 100644
index 0000000..be5cadd
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -0,0 +1,1147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * This is a new class produced when the old {@code AppenderatorImpl} was split. For historical
+ * reasons, the code for creating segments was all handled by the same code path in that class. The code
+ * was correct but inefficient for batch ingestion from a memory perspective. If the input file being processed
+ * by batch ingestion had enough sinks & hydrants produced then it may run out of memory either in the
+ * hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore a new class,
+ * {@code BatchAppenderator}, this class, was created to specialize in batch ingestion and the old class
+ * for stream ingestion was renamed to {@link StreamAppenderator}.
+ * <p>
+ * This class is not thread safe!.
+ * It is important to realize that this class is completely synchronous despite the {@link Appenderator}
+ * interface suggesting otherwise. The concurrency was not required so it has been completely removed.
+ */
+@NotThreadSafe
+public class BatchAppenderator implements Appenderator
+{
+  public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+  // Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps
+  public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+  private static final EmittingLogger log = new EmittingLogger(BatchAppenderator.class);
+  private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+  private final String myId;
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  private final Map<SegmentIdWithShardSpec, Sink> sinks = new HashMap<>();
+  private final long maxBytesTuningConfig;
+  private final boolean skipBytesInMemoryOverheadCheck;
+
+  /**
+   * The following sinks metadata map and associated class are the way to retain metadata now that sinks
+   * are being completely removed from memory after each incremental persist.
+   */
+  private final Map<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new HashMap<>();
+
+  // This variable updated in add(), persist(), and drop()
+  private int rowsCurrentlyInMemory = 0;
+  private int totalRows = 0;
+  private long bytesCurrentlyInMemory = 0;
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile FileLock basePersistDirLock = null;
+  private volatile FileChannel basePersistDirLockChannel = null;
+
+  BatchAppenderator(
+      String id,
+      DataSchema schema,
+      AppenderatorConfig tuningConfig,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  )
+  {
+    this.myId = id;
+    this.schema = Preconditions.checkNotNull(schema, "schema");
+    this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
+    this.metrics = Preconditions.checkNotNull(metrics, "metrics");
+    this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher");
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
+    this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
+    this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
+    this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
+    this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+
+    maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
+    skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
+  }
+
+  @Override
+  public String getId()
+  {
+    return myId;
+  }
+
+  @Override
+  public String getDataSource()
+  {
+    return schema.getDataSource();
+  }
+
+  @Override
+  public Object startJob()
+  {
+    tuningConfig.getBasePersistDirectory().mkdirs();
+    lockBasePersistDirectory();
+    return null;
+  }
+
+  @Override
+  public AppenderatorAddResult add(
+      final SegmentIdWithShardSpec identifier,
+      final InputRow row,
+      @Nullable final Supplier<Committer> committerSupplier,
+      final boolean allowIncrementalPersists
+  ) throws IndexSizeExceededException, SegmentNotWritableException
+  {
+
+    Preconditions.checkArgument(
+        committerSupplier == null,
+        "Batch appenderator does not need a committer!"
+    );
+
+    Preconditions.checkArgument(
+        allowIncrementalPersists,
+        "Batch appenderator should always allow incremental persists!"
+    );
+
+    if (!identifier.getDataSource().equals(schema.getDataSource())) {
+      throw new IAE(
+          "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!",
+          schema.getDataSource(),
+          identifier.getDataSource()
+      );
+    }
+
+    final Sink sink = getOrCreateSink(identifier);
+    metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
+    final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
+    final int sinkRowsInMemoryAfterAdd;
+    final long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
+    final long bytesInMemoryAfterAdd;
+    final IncrementalIndexAddResult addResult;
+
+    try {
+      addResult = sink.add(row, false); // allow incrememtal persis is always true for batch
+      sinkRowsInMemoryAfterAdd = addResult.getRowCount();
+      bytesInMemoryAfterAdd = addResult.getBytesInMemory();
+    }
+    catch (IndexSizeExceededException e) {
+      // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
+      // can't add the row (it just failed). This should never actually happen, though, because we check
+      // sink.canAddRow after returning from add.
+      log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
+      throw e;
+    }
+
+    if (sinkRowsInMemoryAfterAdd < 0) {
+      throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
+    }
+
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }
+
+    final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
+    rowsCurrentlyInMemory += numAddedRows;
+    bytesCurrentlyInMemory += bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd;
+    totalRows += numAddedRows;
+    sinksMetadata.computeIfAbsent(identifier, unused -> new SinkMetadata()).addRows(numAddedRows);
+
+    boolean persist = false;
+    List<String> persistReasons = new ArrayList<>();
+
+    if (!sink.canAppendRow()) {
+      persist = true;
+      persistReasons.add("No more rows can be appended to sink");
+    }
+    if (rowsCurrentlyInMemory >= tuningConfig.getMaxRowsInMemory()) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]",
+          rowsCurrentlyInMemory,
+          tuningConfig.getMaxRowsInMemory()
+      ));
+    }
+    if (bytesCurrentlyInMemory >= maxBytesTuningConfig) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
+          bytesCurrentlyInMemory,
+          maxBytesTuningConfig
+      ));
+    }
+    if (persist) {
+      // persistAll clears rowsCurrentlyInMemory, no need to update it.
+      log.info("Incremental persist to disk because %s.", String.join(",", persistReasons));
+
+      long bytesToBePersisted = 0L;
+      for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+        final Sink sinkEntry = entry.getValue();
+        if (sinkEntry != null) {
+          bytesToBePersisted += sinkEntry.getBytesInMemory();
+          if (sinkEntry.swappable()) {
+            // Code for batch no longer memory maps hydrants but they still take memory...
+            int memoryStillInUse = calculateMemoryUsedByHydrant();
+            bytesCurrentlyInMemory += memoryStillInUse;
+          }
+        }
+      }
+
+      if (!skipBytesInMemoryOverheadCheck
+          && bytesCurrentlyInMemory - bytesToBePersisted > maxBytesTuningConfig) {
+        // We are still over maxBytesTuningConfig even after persisting.
+        // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
+        final String alertMessage = StringUtils.format(
+            "Task has exceeded safe estimated heap usage limits, failing "
+            + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])"
+            + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])",
+            sinks.size(),
+            sinks.values().stream().mapToInt(Iterables::size).sum(),
+            getTotalRowCount(),
+            bytesCurrentlyInMemory,
+            bytesToBePersisted,
+            maxBytesTuningConfig
+        );
+        final String errorMessage = StringUtils.format(
+            "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
+            + "great to have enough space to process additional input rows. This check, along with metering the overhead "
+            + "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting "
+            + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter "
+            + "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an "
+            + "increase in heap footprint, but will allow for more intermediary segment persists to occur before "
+            + "reaching this condition.",
+            alertMessage
+        );
+        log.makeAlert(alertMessage)
+           .addData("dataSource", schema.getDataSource())
+           .emit();
+        throw new RuntimeException(errorMessage);
+      }
+
+      persistAllAndRemoveSinks();
+
+    }
+    return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);
+  }
+
+  @Override
+  /**
+   * Returns all active segments regardless whether they are in memory or persisted
+   */
+  public List<SegmentIdWithShardSpec> getSegments()
+  {
+    return ImmutableList.copyOf(sinksMetadata.keySet());
+  }
+
+  @VisibleForTesting
+  public List<SegmentIdWithShardSpec> getInMemorySegments()
+  {
+    return ImmutableList.copyOf(sinks.keySet());
+  }
+
+  @Override
+  public int getRowCount(final SegmentIdWithShardSpec identifier)
+  {
+    return sinksMetadata.get(identifier).getNumRowsInSegment();
+  }
+
+  @Override
+  public int getTotalRowCount()
+  {
+    return totalRows;
+  }
+
+  @VisibleForTesting
+  public int getRowsInMemory()
+  {
+    return rowsCurrentlyInMemory;
+  }
+
+  @VisibleForTesting
+  public long getBytesCurrentlyInMemory()
+  {
+    return bytesCurrentlyInMemory;
+  }
+
+  @VisibleForTesting
+  public long getBytesInMemory(SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+
+    if (sink == null) {
+      return 0L; // sinks are removed after a persist
+    } else {
+      return sink.getBytesInMemory();
+    }
+  }
+
+  private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier)
+  {
+    Sink retVal = sinks.get(identifier);
+
+    if (retVal == null) {
+      retVal = new Sink(
+          identifier.getInterval(),
+          schema,
+          identifier.getShardSpec(),
+          identifier.getVersion(),
+          tuningConfig.getAppendableIndexSpec(),
+          tuningConfig.getMaxRowsInMemory(),
+          maxBytesTuningConfig,
+          null
+      );
+      bytesCurrentlyInMemory += calculateSinkMemoryInUsed();
+
+      sinks.put(identifier, retVal);
+      metrics.setSinkCount(sinks.size());
+    }
+
+    return retVal;
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
+  {
+    throw new UnsupportedOperationException("No query runner for batch appenderator");
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
+  {
+    throw new UnsupportedOperationException("No query runner for batch appenderator");
+  }
+
+  @Override
+  public void clear()
+  {
+    clear(true);
+  }
+
+  private void clear(boolean removeOnDiskData)
+  {
+    // Drop commit metadata, then abandon all segments.
+    log.info("Clearing all[%d] sinks & their hydrants, removing data on disk: [%s]", sinks.size(), removeOnDiskData);
+    // Drop everything.
+    Iterator<Map.Entry<SegmentIdWithShardSpec, Sink>> sinksIterator = sinks.entrySet().iterator();
+    sinksIterator.forEachRemaining(entry -> {
+      clearSinkMetadata(entry.getKey(), entry.getValue(), removeOnDiskData);
+      sinksIterator.remove();
+    });
+    metrics.setSinkCount(sinks.size());
+  }
+
+  @Override
+  public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+    SinkMetadata sm = sinksMetadata.remove(identifier);
+    if (sm != null) {
+      int originalTotalRows = getTotalRowCount();
+      int rowsToDrop = sm.getNumRowsInSegment();
+      int totalRowsAfter = originalTotalRows - rowsToDrop;
+      if (totalRowsAfter < 0) {
+        log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", totalRowsAfter, identifier, rowsToDrop);
+      }
+      totalRows = Math.max(totalRowsAfter, 0);
+    }
+    if (sink != null) {
+      clearSinkMetadata(identifier, sink, true);
+      if (sinks.remove(identifier) == null) {
+        log.warn("Sink for identifier[%s] not found, skipping", identifier);
+      }
+    }
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
+  {
+    if (committer != null) {
+      throw new ISE("committer must be null for BatchAppenderator");
+    }
+    persistAllAndRemoveSinks();
+    return Futures.immediateFuture(null);
+  }
+
+  /**
+   * Persist all sinks & their hydrants, keep their metadata, and then remove them completely from
+   * memory (to be resurrected right before merge & push)
+   */
+  private void persistAllAndRemoveSinks()
+  {
+
+    final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>();
+    int numPersistedRows = 0;
+    long bytesPersisted = 0L;
+    int totalHydrantsCount = 0;
+    final long totalSinks = sinks.size();
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      final SegmentIdWithShardSpec identifier = entry.getKey();
+      final Sink sink = entry.getValue();
+      if (sink == null) {
+        throw new ISE("No sink for identifier: %s", identifier);
+      }
+
+      final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+      // Since everytime we persist we also get rid of the in-memory references to sinks & hydrants
+      // the invariant of exactly one, always swappable, sink with exactly one unpersisted hydrant must hold
+      int totalHydrantsForSink = hydrants.size();
+      if (totalHydrantsForSink != 1) {
+        throw new ISE("There should be only one hydrant for identifier[%s] but there are[%s]",
+                      identifier, totalHydrantsForSink
+        );
+      }
+      totalHydrantsCount += 1;
+      numPersistedRows += sink.getNumRowsInMemory();
+      bytesPersisted += sink.getBytesInMemory();
+
+      if (!sink.swappable()) {
+        throw new ISE("Sink is not swappable![%s]", identifier);
+      }
+      indexesToPersist.add(Pair.of(sink.swap(), identifier));
+
+    }
+
+    if (indexesToPersist.isEmpty()) {
+      log.info("No indexes will be persisted");
+    }
+    final Stopwatch persistStopwatch = Stopwatch.createStarted();
+    try {
+      for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
+        metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
+      }
+
+      log.info(
+          "Persisted in-memory data for segments: %s",
+          indexesToPersist.stream()
+                          .filter(itp -> itp.rhs != null)
+                          .map(itp -> itp.rhs.asSegmentId().toString())
+                          .distinct()
+                          .collect(Collectors.joining(", "))
+      );
+      log.info(
+          "Persisted stats: processed rows: [%d], persisted rows[%d], persisted sinks: [%d], persisted fireHydrants (across sinks): [%d]",
+          rowIngestionMeters.getProcessed(),
+          numPersistedRows,
+          totalSinks,
+          totalHydrantsCount
+      );
+
+    }
+    catch (Exception e) {
+      metrics.incrementFailedPersists();
+      throw e;
+    }
+    finally {
+      metrics.incrementNumPersists();
+      metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
+      persistStopwatch.stop();
+    }
+
+    // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
+    rowsCurrentlyInMemory -= numPersistedRows;
+    bytesCurrentlyInMemory -= bytesPersisted;
+
+    // remove all sinks after persisting:
+    clear(false);
+
+    log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory",
+             numPersistedRows, bytesPersisted);
+
+  }
+
+  @Override
+  public ListenableFuture<SegmentsAndCommitMetadata> push(
+      final Collection<SegmentIdWithShardSpec> identifiers,
+      @Nullable final Committer committer,
+      final boolean useUniquePath
+  )
+  {
+
+    if (committer != null) {
+      throw new ISE("There should be no committer for batch ingestion");
+    }
+
+    if (useUniquePath) {
+      throw new ISE("Batch ingestion does not require uniquePath");
+    }
+
+    // Any sinks not persisted so far need to be persisted before push:
+    persistAllAndRemoveSinks();
+
+    log.info("Preparing to push...");
+
+    // get the dirs for the identfiers:
+    List<File> identifiersDirs = new ArrayList<>();
+    int totalHydrantsMerged = 0;
+    for (SegmentIdWithShardSpec identifier : identifiers) {
+      SinkMetadata sm = sinksMetadata.get(identifier);
+      if (sm == null) {
+        throw new ISE("No sink has been processed for identifier[%s]", identifier);
+      }
+      File persistedDir = sm.getPersistedFileDir();
+      if (persistedDir == null) {
+        throw new ISE("Sink for identifier[%s] not found in local file system", identifier);
+      }
+      identifiersDirs.add(persistedDir);
+      totalHydrantsMerged += sm.getNumHydrants();
+    }
+
+    // push all sinks for identifiers:
+    final List<DataSegment> dataSegments = new ArrayList<>();
+    for (File identifier : identifiersDirs) {
+
+      // retrieve sink from disk:
+      Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
+      try {
+        identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
+      }
+      catch (IOException e) {
+        throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", identifier);
+      }
+
+      // push it:
+      final DataSegment dataSegment = mergeAndPush(
+          identifiersAndSinks.lhs,
+          identifiersAndSinks.rhs
+      );
+
+      // record it:
+      if (dataSegment != null) {
+        dataSegments.add(dataSegment);
+      } else {
+        log.warn("mergeAndPush[%s] returned null, skipping.", identifiersAndSinks.lhs);
+      }
+
+    }
+
+    log.info("Push complete: total sinks merged[%d], total hydrants merged[%d]",
+             identifiers.size(), totalHydrantsMerged);
+
+    return Futures.immediateFuture(new SegmentsAndCommitMetadata(dataSegments, null));
+  }
+
+  /**
+   * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted.
+   *
+   * @param identifier    sink identifier
+   * @param sink          sink to push
+   * @return segment descriptor, or null if the sink is no longer valid
+   */
+  @Nullable
+  private DataSegment mergeAndPush(
+      final SegmentIdWithShardSpec identifier,
+      final Sink sink
+  )
+  {
+
+
+    // Use a descriptor file to indicate that pushing has completed.
+    final File persistDir = computePersistDir(identifier);
+    final File mergedTarget = new File(persistDir, "merged");
+    final File descriptorFile = computeDescriptorFile(identifier);
+
+    // Sanity checks
+    if (sink.isWritable()) {
+      throw new ISE("Expected sink to be no longer writable before mergeAndPush for segment[%s].", identifier);
+    }
+
+    int numHydrants = 0;
+    for (FireHydrant hydrant : sink) {
+      if (!hydrant.hasSwapped()) {
+        throw new ISE("Expected sink to be fully persisted before mergeAndPush for segment[%s].", identifier);
+      }
+      numHydrants++;
+    }
+
+    SinkMetadata sm = sinksMetadata.get(identifier);
+    if (sm == null) {
+      log.warn("Sink metadata not found just before merge for identifier [%s]", identifier);
+    } else if (numHydrants != sm.getNumHydrants()) {
+      throw new ISE("Number of restored hydrants[%d] for identifier[%s] does not match expected value[%d]",
+                    numHydrants, identifier, sm.getNumHydrants());
+    }
+
+    try {
+      if (descriptorFile.exists()) {
+        // Already pushed.
+        log.info("Segment[%s] already pushed, skipping.", identifier);
+        return objectMapper.readValue(descriptorFile, DataSegment.class);
+      }
+
+      removeDirectory(mergedTarget);
+
+      if (mergedTarget.exists()) {
+        throw new ISE("Merged target[%s] exists after removing?!", mergedTarget);
+      }
+
+      final File mergedFile;
+      final long mergeFinishTime;
+      final long startTime = System.nanoTime();
+      List<QueryableIndex> indexes = new ArrayList<>();
+      Closer closer = Closer.create();
+      try {
+        for (FireHydrant fireHydrant : sink) {
+          Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
+          final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
+          log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
+          indexes.add(queryableIndex);
+          closer.register(segmentAndCloseable.rhs);
+        }
+
+        mergedFile = indexMerger.mergeQueryableIndex(
+            indexes,
+            schema.getGranularitySpec().isRollup(),
+            schema.getAggregators(),
+            schema.getDimensionsSpec(),
+            mergedTarget,
+            tuningConfig.getIndexSpec(),
+            tuningConfig.getSegmentWriteOutMediumFactory(),
+            tuningConfig.getMaxColumnsToMerge()
+        );
+
+        mergeFinishTime = System.nanoTime();
+
+        log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime - startTime) / 1000000);
+      }
+      catch (Throwable t) {
+        throw closer.rethrow(t);
+      }
+      finally {
+        closer.close();
+      }
+
+      // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
+      final DataSegment segment = RetryUtils.retry(
+          // This appenderator is used only for the local indexing task so unique paths are not required
+          () -> dataSegmentPusher.push(
+              mergedFile,
+              sink.getSegment()
+                  .withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(
+                      indexes,
+                      schema.getDimensionsSpec()
+                  )),
+              false
+          ),
+          exception -> exception instanceof Exception,
+          5
+      );
+
+      // Drop the queryable indexes behind the hydrants... they are not needed anymore and their
+      // mapped file references
+      // can generate OOMs during merge if enough of them are held back...
+      for (FireHydrant fireHydrant : sink) {
+        fireHydrant.swapSegment(null);
+      }
+
+      // cleanup, sink no longer needed
+      removeDirectory(computePersistDir(identifier));
+
+      final long pushFinishTime = System.nanoTime();
+
+      log.info(
+          "Segment[%s] of %,d bytes "
+          + "built from %d incremental persist(s) in %,dms; "
+          + "pushed to deep storage in %,dms. "
+          + "Load spec is: %s",
+          identifier,
+          segment.getSize(),
+          indexes.size(),
+          (mergeFinishTime - startTime) / 1000000,
+          (pushFinishTime - mergeFinishTime) / 1000000,
+          objectMapper.writeValueAsString(segment.getLoadSpec())
+      );
+
+      return segment;
+    }
+    catch (Exception e) {
+      metrics.incrementFailedHandoffs();
+      log.warn(e, "Failed to push merged index for segment[%s].", identifier);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    if (!closed.compareAndSet(false, true)) {
+      log.debug("Appenderator already closed, skipping close() call.");
+      return;
+    }
+
+    log.debug("Shutting down...");
+
+    clear(false);
+
+    unlockBasePersistDirectory();
+
+    // cleanup:
+    List<File> persistedIdentifiers = getPersistedidentifierPaths();
+    if (persistedIdentifiers != null) {
+      for (File identifier : persistedIdentifiers) {
+        removeDirectory(identifier);
+      }
+    }
+
+    totalRows = 0;
+    sinksMetadata.clear();
+  }
+
+  /**
+    Nothing to do since there are no executors
+   */
+  @Override
+  public void closeNow()
+  {
+    if (!closed.compareAndSet(false, true)) {
+      log.debug("Appenderator already closed, skipping closeNow() call.");
+      return;
+    }
+
+    log.debug("Shutting down immediately...");
+  }
+
+  private void lockBasePersistDirectory()
+  {
+    if (basePersistDirLock == null) {
+      try {
+        basePersistDirLockChannel = FileChannel.open(
+            computeLockFile().toPath(),
+            StandardOpenOption.CREATE,
+            StandardOpenOption.WRITE
+        );
+
+        basePersistDirLock = basePersistDirLockChannel.tryLock();
+        if (basePersistDirLock == null) {
+          throw new ISE("Cannot acquire lock on basePersistDir: %s", computeLockFile());
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void unlockBasePersistDirectory()
+  {
+    try {
+      if (basePersistDirLock != null) {
+        basePersistDirLock.release();
+        basePersistDirLockChannel.close();
+        basePersistDirLock = null;
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  @Nullable
+  public List<File> getPersistedidentifierPaths()
+  {
+
+    ArrayList<File> retVal = new ArrayList<>();
+
+    final File baseDir = tuningConfig.getBasePersistDirectory();
+    if (!baseDir.exists()) {
+      return null;
+    }
+
+    final File[] files = baseDir.listFiles();
+    if (files == null) {
+      return null;
+    }
+
+    for (File sinkDir : files) {
+      final File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME);
+      if (!identifierFile.isFile()) {
+        // No identifier in this sinkDir; it must not actually be a sink directory. Skip it.
+        continue;
+      }
+      retVal.add(sinkDir);
+    }
+
+    return retVal;
+  }
+
+  private Pair<SegmentIdWithShardSpec, Sink> getIdentifierAndSinkForPersistedFile(File identifierPath)
+      throws IOException
+  {
+
+    final SegmentIdWithShardSpec identifier = objectMapper.readValue(
+        new File(identifierPath, IDENTIFIER_FILE_NAME),
+        SegmentIdWithShardSpec.class
+    );
+
+    // To avoid reading and listing of "merged" dir and other special files
+    final File[] sinkFiles = identifierPath.listFiles(
+        (dir, fileName) -> !(Ints.tryParse(fileName) == null)
+    );
+    if (sinkFiles == null) {
+      throw new ISE("Problem reading persisted sinks in path[%s]", identifierPath);
+    }
+
+    Arrays.sort(
+        sinkFiles,
+        (o1, o2) -> Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()))
+    );
+
+    List<FireHydrant> hydrants = new ArrayList<>();
+    for (File hydrantDir : sinkFiles) {
+      final int hydrantNumber = Integer.parseInt(hydrantDir.getName());
+
+      log.debug("Loading previously persisted partial segment at [%s]", hydrantDir);
+      if (hydrantNumber != hydrants.size()) {
+        throw new ISE("Missing hydrant [%,d] in identifier [%s].", hydrants.size(), identifier);
+      }
+
+      hydrants.add(
+          new FireHydrant(
+              new QueryableIndexSegment(indexIO.loadIndex(hydrantDir), identifier.asSegmentId()),
+              hydrantNumber
+          )
+      );
+    }
+
+    Sink currSink = new Sink(
+        identifier.getInterval(),
+        schema,
+        identifier.getShardSpec(),
+        identifier.getVersion(),
+        tuningConfig.getAppendableIndexSpec(),
+        tuningConfig.getMaxRowsInMemory(),
+        maxBytesTuningConfig,
+        null,
+        hydrants
+    );
+    currSink.finishWriting(); // this sink is not writable
+    return new Pair<>(identifier, currSink);
+  }
+
+  // This function does not remove the sink from its tracking Map (sinks), the caller is responsible for that
+  // this is because the Map is not synchronized and removing elements from a map while traversing it
+  // throws a concurrent access exception
+  private void clearSinkMetadata(
+      final SegmentIdWithShardSpec identifier,
+      final Sink sink,
+      final boolean removeOnDiskData
+  )
+  {
+    // Ensure no future writes will be made to this sink.
+    if (sink.finishWriting()) {
+      // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement,
+      // i.e. those that haven't been persisted for *InMemory counters, or pushed to deep storage for the total counter.
+      rowsCurrentlyInMemory -= sink.getNumRowsInMemory();
+      bytesCurrentlyInMemory -= sink.getBytesInMemory();
+      bytesCurrentlyInMemory -= calculateSinkMemoryInUsed();
+      for (FireHydrant hydrant : sink) {
+        // Decrement memory used by all Memory Mapped Hydrant
+        if (!hydrant.equals(sink.getCurrHydrant())) {
+          bytesCurrentlyInMemory -= calculateMemoryUsedByHydrant();
+        }
+      }
+      // totalRows are not decremented when removing the sink from memory, sink was just persisted and it
+      // still "lives" but it is in hibernation. It will be revived later just before push.
+    }
+
+    if (removeOnDiskData) {
+      removeDirectory(computePersistDir(identifier));
+    }
+
+    log.info("Removed sink for segment[%s].", identifier);
+
+  }
+
+  private File computeLockFile()
+  {
+    return new File(tuningConfig.getBasePersistDirectory(), ".lock");
+  }
+
+  private File computePersistDir(SegmentIdWithShardSpec identifier)
+  {
+    return new File(tuningConfig.getBasePersistDirectory(), identifier.toString());
+  }
+
+  private File computeIdentifierFile(SegmentIdWithShardSpec identifier)
+  {
+    return new File(computePersistDir(identifier), IDENTIFIER_FILE_NAME);
+  }
+
+  private File computeDescriptorFile(SegmentIdWithShardSpec identifier)
+  {
+    return new File(computePersistDir(identifier), "descriptor.json");
+  }
+
+  private File createPersistDirIfNeeded(SegmentIdWithShardSpec identifier) throws IOException
+  {
+    final File persistDir = computePersistDir(identifier);
+    org.apache.commons.io.FileUtils.forceMkdir(persistDir);
+
+    objectMapper.writeValue(computeIdentifierFile(identifier), identifier);
+
+    return persistDir;
+  }
+
+  /**
+   * Persists the given hydrant and returns the number of rows persisted.
+   *
+   * @param indexToPersist hydrant to persist
+   * @param identifier     the segment this hydrant is going to be part of
+   * @return the number of rows persisted
+   */
+  private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec identifier)
+  {
+    if (indexToPersist.hasSwapped()) {
+      throw new ISE(
+          "Segment[%s] hydrant[%s] already swapped. This cannot happen.",
+          identifier,
+          indexToPersist
+      );
+    }
+
+    log.debug("Segment[%s], persisting Hydrant[%s]", identifier, indexToPersist);
+
+    try {
+      final long startTime = System.nanoTime();
+      int numRows = indexToPersist.getIndex().size();
+
+      // since the sink may have been persisted before it may have lost its
+      // hydrant count, we remember that value in the sinks metadata so we have
+      // to pull it from there....
+      SinkMetadata sm = sinksMetadata.get(identifier);
+      if (sm == null) {
+        throw new ISE("Sink must not be null for identifier when persisting hydrant[%s]", identifier);
+      }
+      final File persistDir = createPersistDirIfNeeded(identifier);
+      indexMerger.persist(
+          indexToPersist.getIndex(),
+          identifier.getInterval(),
+          new File(persistDir, String.valueOf(sm.getNumHydrants())),
+          tuningConfig.getIndexSpecForIntermediatePersists(),
+          tuningConfig.getSegmentWriteOutMediumFactory()
+      );
+      sm.setPersistedFileDir(persistDir);
+
+      log.info(
+          "Persisted in-memory data for segment[%s] spill[%s] to disk in [%,d] ms (%,d rows).",
+          indexToPersist.getSegmentId(),
+          indexToPersist.getCount(),
+          (System.nanoTime() - startTime) / 1000000,
+          numRows
+      );
+
+      indexToPersist.swapSegment(null);
+      // remember hydrant count:
+      sm.addHydrants(1);
+
+      return numRows;
+    }
+    catch (IOException e) {
+      log.makeAlert("Incremental persist failed")
+         .addData("segment", identifier.toString())
+         .addData("dataSource", schema.getDataSource())
+         .addData("count", indexToPersist.getCount())
+         .emit();
+
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void removeDirectory(final File target)
+  {
+    if (target.exists()) {
+      try {
+        FileUtils.deleteDirectory(target);
+        log.info("Removed directory [%s]", target);
+      }
+      catch (Exception e) {
+        log.makeAlert(e, "Failed to remove directory[%s]", schema.getDataSource())
+           .addData("file", target)
+           .emit();
+      }
+    }
+  }
+
+  private int calculateMemoryUsedByHydrant()
+  {
+    if (skipBytesInMemoryOverheadCheck) {
+      return 0;
+    }
+    // These calculations are approximated from actual heap dumps.
+    int total;
+    total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT;
+    return total;
+  }
+
+  private int calculateSinkMemoryInUsed()
+  {
+    if (skipBytesInMemoryOverheadCheck) {
+      return 0;
+    }
+    // Rough estimate of memory footprint of empty Sink based on actual heap dumps
+    return ROUGH_OVERHEAD_PER_SINK;
+  }
+
+  /**
+   * This class is used for information that needs to be kept related to Sinks as
+   * they are persisted and removed from memory at every incremental persist.
+   * The information is used for sanity checks and as information required
+   * for functionality, depending in the field that is used. More info about the
+   * fields is annotated as comments in the class
+   */
+  private static class SinkMetadata
+  {
+    /** This is used to maintain the rows in the sink accross persists of the sink
+     * used for functionality (i.e. to detect whether an incremental push
+     * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, Long)}
+     **/
+    private int numRowsInSegment;
+    /** For sanity check as well as functionality: to make sure that all hydrants for a sink are restored from disk at
+     * push time and also to remember the fire hydrant "count" when persisting it.
+     */
+    private int numHydrants;
+    /* Reference to directory that holds the persisted data */
+    File persistedFileDir;
+
+    public SinkMetadata()
+    {
+      this(0, 0);
+    }
+
+    public SinkMetadata(int numRowsInSegment, int numHydrants)
+    {
+      this.numRowsInSegment = numRowsInSegment;
+      this.numHydrants = numHydrants;
+    }
+
+    public void addRows(int num)
+    {
+      numRowsInSegment += num;
+    }
+
+    public void addHydrants(int num)
+    {
+      numHydrants += num;
+    }
+
+    public int getNumRowsInSegment()
+    {
+      return numRowsInSegment;
+    }
+
+    public int getNumHydrants()
+    {
+      return numHydrants;
+    }
+
+    public void setPersistedFileDir(File persistedFileDir)
+    {
+      this.persistedFileDir = persistedFileDir;
+    }
+
+    public File getPersistedFileDir()
+    {
+      return persistedFileDir;
+    }
+
+  }
+
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 35b0884..0dcd7d6 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -90,7 +90,7 @@
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      boolean batchMemoryMappedIndex
+      boolean useLegacyBatchProcessing
   )
   {
     throw new UOE(ERROR_MSG);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 323122a..1c44b30 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -122,7 +122,7 @@
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      boolean batchMemoryMappedIndex
+      boolean useLegacyBatchProcessing
   )
   {
     // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@@ -140,7 +140,7 @@
           indexMerger,
           rowIngestionMeters,
           parseExceptionHandler,
-          batchMemoryMappedIndex
+          useLegacyBatchProcessing
       );
       return batchAppenderator;
     }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
similarity index 93%
rename from server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
rename to server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index e6cd9c5..1ee36f5 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -60,7 +60,6 @@
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.ReferenceCountingSegment;
-import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -72,7 +71,6 @@
 import org.apache.druid.segment.realtime.plumber.Sink;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.joda.time.Interval;
 
@@ -86,9 +84,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -104,7 +100,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-public class AppenderatorImpl implements Appenderator
+public class StreamAppenderator implements Appenderator
 {
   // Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
   public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000;
@@ -115,7 +111,7 @@
   // Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps
   public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
 
-  private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
+  private static final EmittingLogger log = new EmittingLogger(StreamAppenderator.class);
   private static final int WARN_DELAY = 1000;
   private static final String IDENTIFIER_FILE_NAME = "identifier.json";
 
@@ -166,19 +162,6 @@
 
   private volatile Throwable persistError;
 
-  private final boolean isRealTime;
-  /**
-   * Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator
-   * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant
-   * at merge time. This is necessary since batch appenderator will not map the QueryableIndex
-   * at persist time in order to minimize its memory footprint. This has to be synchronized since the
-   * map may be accessed from multiple threads.
-   * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted
-   * with reference semantics.
-   */
-  private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata =
-      Collections.synchronizedMap(new IdentityHashMap<>());
-
   /**
    * This constructor allows the caller to provide its own SinkQuerySegmentWalker.
    *
@@ -188,7 +171,7 @@
    * It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple
    * Appenderators.
    */
-  AppenderatorImpl(
+  StreamAppenderator(
       String id,
       DataSchema schema,
       AppenderatorConfig tuningConfig,
@@ -201,8 +184,7 @@
       IndexMerger indexMerger,
       Cache cache,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler,
-      boolean isRealTime
+      ParseExceptionHandler parseExceptionHandler
   )
   {
     this.myId = id;
@@ -218,7 +200,6 @@
     this.texasRanger = sinkQuerySegmentWalker;
     this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
     this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
-    this.isRealTime = isRealTime;
 
     if (sinkQuerySegmentWalker == null) {
       this.sinkTimeline = new VersionedIntervalTimeline<>(
@@ -555,9 +536,6 @@
           futures.add(abandonSegment(entry.getKey(), entry.getValue(), true));
         }
 
-        // Re-initialize hydrant map:
-        persistedHydrantMetadata.clear();
-
         // Await dropping.
         Futures.allAsList(futures).get();
       }
@@ -867,34 +845,6 @@
       Closer closer = Closer.create();
       try {
         for (FireHydrant fireHydrant : sink) {
-
-          // if batch, swap/persist did not memory map the incremental index, we need it mapped now:
-          if (!isRealTime()) {
-
-            // sanity
-            Pair<File, SegmentId> persistedMetadata = persistedHydrantMetadata.get(fireHydrant);
-            if (persistedMetadata == null) {
-              throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant);
-            }
-
-            File persistedFile = persistedMetadata.lhs;
-            SegmentId persistedSegmentId = persistedMetadata.rhs;
-
-            // sanity:
-            if (persistedFile == null) {
-              throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant);
-            } else if (persistedSegmentId == null) {
-              throw new ISE(
-                  "Persisted segmentId for batch hydrant in file [%s] is null!",
-                  persistedFile.getPath()
-              );
-            }
-            fireHydrant.swapSegment(new QueryableIndexSegment(
-                indexIO.loadIndex(persistedFile),
-                persistedSegmentId
-            ));
-          }
-
           Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
           final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
           log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
@@ -942,15 +892,6 @@
           5
       );
 
-      if (!isRealTime()) {
-        // Drop the queryable indexes behind the hydrants... they are not needed anymore and their
-        // mapped file references
-        // can generate OOMs during merge if enough of them are held back...
-        for (FireHydrant fireHydrant : sink) {
-          fireHydrant.swapSegment(null);
-        }
-      }
-
       final long pushFinishTime = System.nanoTime();
 
       objectMapper.writeValue(descriptorFile, segment);
@@ -1077,13 +1018,6 @@
     }
   }
 
-  @Override
-  public boolean isRealTime()
-  {
-    return isRealTime;
-  }
-
-
   private void lockBasePersistDirectory()
   {
     if (basePersistDirLock == null) {
@@ -1401,8 +1335,6 @@
                 cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
               }
               hydrant.swapSegment(null);
-              // remove hydrant from persisted metadata:
-              persistedHydrantMetadata.remove(hydrant);
             }
 
             if (removeOnDiskData) {
@@ -1517,15 +1449,10 @@
             numRows
         );
 
-        // Map only when this appenderator is being driven by a real time task:
-        Segment segmentToSwap = null;
-        if (isRealTime()) {
-          segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId());
-        } else {
-          // remember file path & segment id to rebuild the queryable index for merge:
-          persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId()));
-        }
-        indexToPersist.swapSegment(segmentToSwap);
+        indexToPersist.swapSegment(new QueryableIndexSegment(
+            indexIO.loadIndex(persistedFile),
+            indexToPersist.getSegmentId()
+        ));
 
         return numRows;
       }
@@ -1563,14 +1490,10 @@
     // These calculations are approximated from actual heap dumps.
     // Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment,
     // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.)
-    int total;
-    total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT;
-    if (isRealTime()) {
-      // for real time add references to byte memory mapped references..
-      total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
-               (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
-               ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
-    }
+    int total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
+                (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
+                (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+                ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
     return total;
   }
 
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index faba506..f390f76 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -175,7 +175,7 @@
           DatasourceBundle::new
       );
 
-      Appenderator appenderator = new AppenderatorImpl(
+      Appenderator appenderator = new StreamAppenderator(
           taskId,
           schema,
           rewriteAppenderatorConfigMemoryLimits(config),
@@ -188,8 +188,7 @@
           wrapIndexMerger(indexMerger),
           cache,
           rowIngestionMeters,
-          parseExceptionHandler,
-          true
+          parseExceptionHandler
       );
 
       datasourceBundle.addAppenderator(taskId, appenderator);
@@ -209,7 +208,7 @@
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      boolean batchMemoryMappedIndex
+      boolean useLegacyBatchProcessing
   )
   {
     synchronized (this) {
@@ -229,7 +228,7 @@
           wrapIndexMerger(indexMerger),
           rowIngestionMeters,
           parseExceptionHandler,
-          batchMemoryMappedIndex
+          useLegacyBatchProcessing
       );
       datasourceBundle.addAppenderator(taskId, appenderator);
       return appenderator;
@@ -493,7 +492,7 @@
 
   /**
    * This wrapper around IndexMerger limits concurrent calls to the merge/persist methods used by
-   * {@link AppenderatorImpl} with a shared executor service. Merge/persist methods that are not used by
+   * {@link StreamAppenderator} with a shared executor service. Merge/persist methods that are not used by
    * AppenderatorImpl will throw an exception if called.
    */
   public static class LimitedPoolIndexMerger implements IndexMerger
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index c07fbd0..f2d793e 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -34,11 +34,11 @@
 public class AppenderatorPlumberTest
 {
   private final AppenderatorPlumber plumber;
-  private final AppenderatorTester appenderatorTester;
+  private final StreamAppenderatorTester streamAppenderatorTester;
 
   public AppenderatorPlumberTest() throws Exception
   {
-    this.appenderatorTester = new AppenderatorTester(10);
+    this.streamAppenderatorTester = new StreamAppenderatorTester(10);
     DataSegmentAnnouncer segmentAnnouncer = EasyMock
         .createMock(DataSegmentAnnouncer.class);
     segmentAnnouncer.announceSegment(EasyMock.anyObject());
@@ -84,26 +84,27 @@
         null
     );
 
-    this.plumber = new AppenderatorPlumber(appenderatorTester.getSchema(),
-        tuningConfig, appenderatorTester.getMetrics(),
-        segmentAnnouncer, segmentPublisher, handoffNotifier,
-        appenderatorTester.getAppenderator());
+    this.plumber = new AppenderatorPlumber(streamAppenderatorTester.getSchema(),
+                                           tuningConfig, streamAppenderatorTester.getMetrics(),
+                                           segmentAnnouncer, segmentPublisher, handoffNotifier,
+                                           streamAppenderatorTester.getAppenderator());
 
   }
 
   @Test
   public void testSimpleIngestion() throws Exception
   {
-    Appenderator appenderator = appenderatorTester.getAppenderator();
+    Appenderator appenderator = streamAppenderatorTester.getAppenderator();
 
     // startJob
     Assert.assertEquals(null, plumber.startJob());
 
     // getDataSource
-    Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
+    Assert.assertEquals(StreamAppenderatorTester.DATASOURCE, appenderator.getDataSource());
 
-    InputRow[] rows = new InputRow[] {AppenderatorTest.ir("2000", "foo", 1), 
-        AppenderatorTest.ir("2000", "bar", 2), AppenderatorTest.ir("2000", "qux", 4)};
+    InputRow[] rows = new InputRow[] {
+        StreamAppenderatorTest.ir("2000", "foo", 1),
+        StreamAppenderatorTest.ir("2000", "bar", 2), StreamAppenderatorTest.ir("2000", "qux", 4)};
     // add
     Assert.assertEquals(1, plumber.add(rows[0], null).getRowCount());
 
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 024bfa0..2cf78cc 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -29,21 +29,25 @@
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
 import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
-import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -73,7 +77,7 @@
   );
 
   private SegmentAllocator allocator;
-  private AppenderatorTester appenderatorTester;
+  private BatchAppenderatorTester appenderatorTester;
   private BatchAppenderatorDriver driver;
   private DataSegmentKiller dataSegmentKiller;
 
@@ -84,13 +88,13 @@
   @Before
   public void setup()
   {
-    appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
+    appenderatorTester = new BatchAppenderatorTester(MAX_ROWS_IN_MEMORY);
     allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
     dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
     driver = new BatchAppenderatorDriver(
         appenderatorTester.getAppenderator(),
         allocator,
-        new TestUsedSegmentChecker(appenderatorTester),
+        new TestUsedSegmentChecker(appenderatorTester.getPushedSegments()),
         dataSegmentKiller
     );
 
@@ -199,4 +203,38 @@
   {
     return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
   }
+
+  static class TestSegmentAllocator implements SegmentAllocator
+  {
+    private final String dataSource;
+    private final Granularity granularity;
+    private final Map<Long, AtomicInteger> counters = new HashMap<>();
+
+    public TestSegmentAllocator(String dataSource, Granularity granularity)
+    {
+      this.dataSource = dataSource;
+      this.granularity = granularity;
+    }
+
+    @Override
+    public SegmentIdWithShardSpec allocate(
+        final InputRow row,
+        final String sequenceName,
+        final String previousSegmentId,
+        final boolean skipSegmentLineageCheck
+    )
+    {
+      DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
+      final long timestampTruncated = dateTimeTruncated.getMillis();
+      counters.putIfAbsent(timestampTruncated, new AtomicInteger());
+      final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
+      return new SegmentIdWithShardSpec(
+          dataSource,
+          granularity.bucket(dateTimeTruncated),
+          VERSION,
+          new NumberedShardSpec(partitionNum, 0)
+      );
+    }
+  }
+
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
new file mode 100644
index 0000000..05b26b4
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
@@ -0,0 +1,918 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BatchAppenderatorTest extends InitializedNullHandlingTest
+{
+  private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
+      createSegmentId("2000/2001", "A", 0), // should be in seg_0
+      createSegmentId("2000/2001", "A", 1), // seg_1
+      createSegmentId("2001/2002", "A", 0) // seg 2
+  );
+
+  @Test
+  public void testSimpleIngestion() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      // startJob
+      Assert.assertNull(appenderator.startJob());
+
+      // getDataSource
+      Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+      // add #1
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      // add #2
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 2), null)
+                      .getNumRowsInSegment()
+      );
+
+      // getSegments
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 2),
+          appenderator.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+
+
+      // add #3, this hits max rows in memory:
+      Assert.assertEquals(
+          2,
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "sux", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      // since we just added three rows and the max rows in memory is three, all the segments (sinks etc)
+      // above should be cleared now
+      Assert.assertEquals(
+          Collections.emptyList(),
+          ((BatchAppenderator) appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList())
+      );
+
+      // add #4, this will add one more temporary segment:
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 4), null)
+                      .getNumRowsInSegment()
+      );
+
+
+      // push all
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          appenderator.getSegments(),
+          null,
+          false
+      ).get();
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 3),
+          Lists.transform(
+              segmentsAndCommitMetadata.getSegments(),
+              SegmentIdWithShardSpec::fromDataSegment
+          ).stream().sorted().collect(Collectors.toList())
+      );
+      Assert.assertEquals(
+          tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+
+      appenderator.close();
+      Assert.assertTrue(appenderator.getSegments().isEmpty());
+    }
+  }
+
+  @Test
+  public void testSimpleIngestionWithFallbackCodePath() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(
+        3,
+        -1,
+        null,
+        true,
+        new SimpleRowIngestionMeters(),
+        true,
+        true
+    )) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      // startJob
+      Assert.assertNull(appenderator.startJob());
+
+      // getDataSource
+      Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+      // add #1
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      // add #2
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 2), null)
+                      .getNumRowsInSegment()
+      );
+
+      // getSegments
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 2),
+          appenderator.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+
+
+      // add #3, this hits max rows in memory:
+      Assert.assertEquals(
+          2,
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "sux", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      // since we just added three rows and the max rows in memory is three BUT we are using
+      // the old, fallback, code path that does not remove sinks, the segments should still be there
+      Assert.assertEquals(
+          2,
+          appenderator.getSegments().size()
+      );
+
+      // add #4, this will add one more temporary segment:
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 4), null)
+                      .getNumRowsInSegment()
+      );
+
+
+      // push all
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          appenderator.getSegments(),
+          null,
+          false
+      ).get();
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 3),
+          Lists.transform(
+              segmentsAndCommitMetadata.getSegments(),
+              SegmentIdWithShardSpec::fromDataSegment
+          ).stream().sorted().collect(Collectors.toList())
+      );
+      Assert.assertEquals(
+          tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+
+      appenderator.close();
+      Assert.assertTrue(appenderator.getSegments().isEmpty());
+    }
+  }
+  @Test
+  public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
+  {
+    try (
+        final BatchAppenderatorTester tester = new BatchAppenderatorTester(
+            100,
+            1024,
+            null,
+            true,
+            new SimpleRowIngestionMeters(),
+            true,
+            false
+        )
+    ) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
+      int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+      Assert.assertEquals(
+          182 + nullHandlingOverhead,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(
+          182 + nullHandlingOverhead,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+      );
+      appenderator.close();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    }
+  }
+
+  @Test
+  public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
+  {
+    try (
+        final BatchAppenderatorTester tester = new BatchAppenderatorTester(
+            100,
+            1024,
+            null,
+            true,
+            new SimpleRowIngestionMeters(),
+            true,
+            false
+        )
+    ) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182
+      int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+      Assert.assertEquals(182 + nullHandlingOverhead, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(
+          364 + 2 * nullHandlingOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+      Assert.assertEquals(2, appenderator.getSegments().size());
+      appenderator.close();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    }
+  }
+
+  @Test
+  public void testMaxBytesInMemory() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 15000, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      // Still under maxSizeInBytes after the add. Hence, we do not persist yet
+      //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
+      int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+      int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+      int sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // We do multiple more adds to the same sink to cause persist.
+      for (int i = 0; i < 53; i++) {
+        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
+      }
+      sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      // currHydrant size is 0 since we just persist all indexes to disk.
+      currentInMemoryIndexSize = 0;
+      // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+      // currHydrant in the sink has 0 bytesInMemory since we just did a persist
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      // no sinks no hydrants after a persist so we should have zero bytes currently in memory
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // Add a single row after persisted
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
+      // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
+      currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // We do multiple more adds to the same sink to cause persist.
+      for (int i = 0; i < 53; i++) {
+        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
+      }
+      // currHydrant size is 0 since we just persist all indexes to disk.
+      currentInMemoryIndexSize = 0;
+      // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+      // so no sinks & hydrants should be in memory...
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+      appenderator.close();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
+  {
+    try (final BatchAppenderatorTester tester =
+             new BatchAppenderatorTester(100, 5180, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+    }
+  }
+
+  @Test
+  public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
+  {
+    try (
+        final BatchAppenderatorTester tester = new BatchAppenderatorTester(
+            100,
+            10,
+            null,
+            true,
+            new SimpleRowIngestionMeters(),
+            true,
+            false
+        )
+    ) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      // Expected 0 since we persisted after the add
+      Assert.assertEquals(
+          0,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      // Expected 0 since we persisted after the add
+      Assert.assertEquals(
+          0,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+    }
+  }
+
+  @Test
+  public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
+  {
+    try (final BatchAppenderatorTester tester =
+             new BatchAppenderatorTester(100, 10000, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+
+      // Still under maxSizeInBytes after the add. Hence, we do not persist yet
+      int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+      int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+      int sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      Assert.assertEquals(
+          currentInMemoryIndexSize + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // Close with row still in memory (no persist)
+      appenderator.close();
+
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
+    }
+  }
+
+  @Test
+  public void testMaxBytesInMemoryInMultipleSinks() throws Exception
+  {
+    try (final BatchAppenderatorTester tester =
+             new BatchAppenderatorTester(1000, 28748, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+      // next records are 182 bytes
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+
+      // Still under maxSizeInBytes after the add. Hence, we do not persist yet
+      //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
+      int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+      int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+      int sinkSizeOverhead = 2 * BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+      );
+      Assert.assertEquals(
+          (2 * currentInMemoryIndexSize) + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // We do multiple more adds to the same sink to cause persist.
+      for (int i = 0; i < 49; i++) {
+        // these records are 186 bytes
+        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
+        appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" + i, 1), null);
+      }
+      // sinks + currHydrant size is 0 since we just persist all indexes to disk.
+      currentInMemoryIndexSize = 0;
+      // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+      // currHydrant and the sink has 0 bytesInMemory since we just did a persist
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // Add a single row after persisted to sink 0
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
+      // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
+      currentInMemoryIndexSize = 182 + nullHandlingOverhead;
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          0,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+      );
+      // only one sink so far:
+      sinkSizeOverhead = BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      Assert.assertEquals(
+          currentInMemoryIndexSize + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+      // Now add a single row to sink 1
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bob", 1), null);
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+      );
+      sinkSizeOverhead += BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      Assert.assertEquals(
+          (2 * currentInMemoryIndexSize) + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+
+      // We do multiple more adds to the both sink to cause persist.
+      for (int i = 0; i < 49; i++) {
+        // 186 bytes
+        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i, 1), null);
+        appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" + i, 1), null);
+      }
+      // currHydrant size is 0 since we just persist all indexes to disk.
+      currentInMemoryIndexSize = 0;
+      // We are now over maxSizeInBytes after the add. Hence, we do a persist.
+      // currHydrant in the sink has 0 bytesInMemory since we just did a persist
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+      );
+      // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
+      // 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
+      // persists.
+      Assert.assertEquals(
+          currentInMemoryIndexSize,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+      appenderator.close();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory());
+    }
+  }
+
+  @Test
+  public void testIgnoreMaxBytesInMemory() throws Exception
+  {
+    try (final BatchAppenderatorTester tester =
+             new BatchAppenderatorTester(100, -1, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.startJob();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      //we still calculate the size even when ignoring it to make persist decision
+      int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
+      Assert.assertEquals(
+          182 + nullHandlingOverhead,
+          ((BatchAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+      );
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+
+      // we added two rows only and we told that maxSizeInBytes should be ignored, so it should not have been
+      // persisted:
+      int sinkSizeOverhead = 2 * BatchAppenderator.ROUGH_OVERHEAD_PER_SINK;
+      Assert.assertEquals(
+          (364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
+          ((BatchAppenderator) appenderator).getBytesCurrentlyInMemory()
+      );
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.close();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    }
+  }
+
+  @Test
+  public void testMaxRowsInMemory() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.startJob();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      // no persist since last add was for a dup record
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), null);
+      // persist expected ^ (3) rows added
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "baz", 1), null);
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      // persist expected ^ (3) rows added
+      //appenderator.persistAll(null);
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.close();
+    }
+  }
+
+  @Test
+  public void testAllHydrantsAreRecovered() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(1, false)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.startJob();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo2", 1), null);
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo3", 1), null);
+
+      // Since maxRowsInMemory is one there ought to be three hydrants stored and recovered
+      // just before push, internally the code has a sanity check to make sure that this works..if it does not it throws
+      // an exception
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          appenderator.getSegments(),
+          null,
+          false
+      ).get();
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 1),
+          Lists.transform(
+              segmentsAndCommitMetadata.getSegments(),
+              SegmentIdWithShardSpec::fromDataSegment
+          ).stream().sorted().collect(Collectors.toList())
+      );
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.close();
+    }
+  }
+
+  @Test
+  public void testTotalRowsPerSegment() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.startJob();
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Appenderator.AppenderatorAddResult addResult0 =
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, addResult0.getNumRowsInSegment());
+
+      Appenderator.AppenderatorAddResult addResult1 =
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, addResult1.getNumRowsInSegment());
+
+      addResult1 = // dup!
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, addResult1.getNumRowsInSegment()); // dup record does not count
+      // no persist since last add was for a dup record
+
+      addResult1 =
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), null);
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, addResult1.getNumRowsInSegment());
+      // persist expected ^ (3) rows added
+
+      // total rows per segment ought to be preserved even when sinks are removed from memory:
+      addResult1 =
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), null);
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(3, addResult1.getNumRowsInSegment());
+
+      addResult0 =
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "baz", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, addResult0.getNumRowsInSegment());
+
+      addResult1 =
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 1), null);
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(4, addResult1.getNumRowsInSegment());
+      // persist expected ^ (3) rows added
+
+      addResult0 =
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bob", 1), null);
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(3, addResult0.getNumRowsInSegment());
+
+      appenderator.close();
+
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    }
+  }
+
+
+  @Test
+  public void testRestoreFromDisk() throws Exception
+  {
+    final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, true);
+    final Appenderator appenderator = tester.getAppenderator();
+
+    appenderator.startJob();
+
+    appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+    appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null);
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+
+    appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "baz", 3), null);
+    appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null);
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+
+    appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 5), null);
+    Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+    appenderator.persistAll(null).get();
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+
+    List<File> segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
+    Assert.assertNotNull(segmentPaths);
+    Assert.assertEquals(3, segmentPaths.size());
+
+
+    appenderator.push(IDENTIFIERS, null, false).get();
+
+    segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
+    Assert.assertNotNull(segmentPaths);
+    Assert.assertEquals(0, segmentPaths.size());
+
+    appenderator.close();
+
+  }
+
+  @Test
+  public void testCleanupFromDiskAfterClose() throws Exception
+  {
+    final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, true);
+    final Appenderator appenderator = tester.getAppenderator();
+
+    appenderator.startJob();
+
+    appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+    appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null);
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    Assert.assertEquals(2, appenderator.getTotalRowCount());
+
+    appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "baz", 3), null);
+    appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null);
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    Assert.assertEquals(4, appenderator.getTotalRowCount());
+
+    appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 5), null);
+    Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+    appenderator.persistAll(null).get();
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    Assert.assertEquals(5, appenderator.getTotalRowCount());
+
+    List<File> segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
+    Assert.assertNotNull(segmentPaths);
+    Assert.assertEquals(3, segmentPaths.size());
+
+    appenderator.close();
+
+    segmentPaths = ((BatchAppenderator) appenderator).getPersistedidentifierPaths();
+    Assert.assertNotNull(segmentPaths);
+    Assert.assertEquals(0, segmentPaths.size());
+
+    Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+    Assert.assertEquals(0, appenderator.getTotalRowCount());
+
+  }
+
+
+  @Test(timeout = 60_000L)
+  public void testTotalRowCount() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      Assert.assertEquals(0, appenderator.getTotalRowCount());
+      appenderator.startJob();
+      Assert.assertEquals(0, appenderator.getTotalRowCount());
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+      Assert.assertEquals(1, appenderator.getTotalRowCount());
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), null);
+      Assert.assertEquals(2, appenderator.getTotalRowCount());
+
+      appenderator.persistAll(null).get();
+      Assert.assertEquals(2, appenderator.getTotalRowCount());
+      appenderator.drop(IDENTIFIERS.get(0)).get();
+      Assert.assertEquals(1, appenderator.getTotalRowCount());
+      appenderator.drop(IDENTIFIERS.get(1)).get();
+      Assert.assertEquals(0, appenderator.getTotalRowCount());
+
+      appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bar", 1), null);
+      Assert.assertEquals(1, appenderator.getTotalRowCount());
+      appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "baz", 1), null);
+      Assert.assertEquals(2, appenderator.getTotalRowCount());
+      appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 1), null);
+      Assert.assertEquals(3, appenderator.getTotalRowCount());
+      appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 1), null);
+      Assert.assertEquals(4, appenderator.getTotalRowCount());
+
+      appenderator.persistAll(null).get();
+      Assert.assertEquals(4, appenderator.getTotalRowCount());
+      appenderator.drop(IDENTIFIERS.get(2)).get();
+      Assert.assertEquals(0, appenderator.getTotalRowCount());
+
+      appenderator.close();
+      Assert.assertEquals(0, appenderator.getTotalRowCount());
+    }
+  }
+
+  @Test
+  public void testVerifyRowIngestionMetrics() throws Exception
+  {
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    try (final BatchAppenderatorTester tester =
+             new BatchAppenderatorTester(5,
+                                    10000L,
+                                    null, false, rowIngestionMeters
+             )) {
+      final Appenderator appenderator = tester.getAppenderator();
+      appenderator.startJob();
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000",
+                                                          "foo", "invalid_met"
+      ), null);
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null);
+
+      Assert.assertEquals(1, rowIngestionMeters.getProcessed());
+      Assert.assertEquals(1, rowIngestionMeters.getProcessedWithError());
+      Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
+      Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
+    }
+  }
+
+  @Test
+  public void testPushContract() throws Exception
+  {
+    final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
+    try (final BatchAppenderatorTester tester =
+             new BatchAppenderatorTester(1,
+                                         50000L,
+                                         null, false, rowIngestionMeters
+             )) {
+      final Appenderator appenderator = tester.getAppenderator();
+      appenderator.startJob();
+
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 1), null);
+      appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar2", 1), null);
+      appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar3", 1), null);
+
+      // push only a single segment
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          Collections.singletonList(IDENTIFIERS.get(0)),
+          null,
+          false
+      ).get();
+
+      // only one segment must have been pushed:
+      Assert.assertEquals(
+          Collections.singletonList(IDENTIFIERS.get(0)),
+          Lists.transform(
+              segmentsAndCommitMetadata.getSegments(),
+              SegmentIdWithShardSpec::fromDataSegment
+          ).stream().sorted().collect(Collectors.toList())
+      );
+
+      Assert.assertEquals(
+          tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
+      );
+      // the responsability for dropping is in the BatchAppenderatorDriver, drop manually:
+      appenderator.drop(IDENTIFIERS.get(0));
+
+      // and the segment that was not pushed should still be active
+      Assert.assertEquals(
+          Collections.singletonList(IDENTIFIERS.get(1)),
+          appenderator.getSegments()
+      );
+
+
+    }
+  }
+  
+  private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
+  {
+    return new SegmentIdWithShardSpec(
+        BatchAppenderatorTester.DATASOURCE,
+        Intervals.of(interval),
+        version,
+        new LinearShardSpec(partitionNum)
+
+    );
+  }
+
+  static InputRow createInputRow(String ts, String dim, Object met)
+  {
+    return new MapBasedInputRow(
+        DateTimes.of(ts).getMillis(),
+        ImmutableList.of("dim"),
+        ImmutableMap.of(
+            "dim",
+            dim,
+            "met",
+            met
+        )
+    );
+  }
+
+
+}
+
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
new file mode 100644
index 0000000..af628ba
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class BatchAppenderatorTester implements AutoCloseable
+{
+  public static final String DATASOURCE = "foo";
+
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final ObjectMapper objectMapper;
+  private final Appenderator appenderator;
+  private final ServiceEmitter emitter;
+
+  private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory
+  )
+  {
+    this(maxRowsInMemory, -1, null, false);
+  }
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final boolean enablePushFailure
+  )
+  {
+    this(maxRowsInMemory, -1, null, enablePushFailure);
+  }
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      final boolean enablePushFailure
+  )
+  {
+    this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
+  }
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      final File basePersistDirectory,
+      final boolean enablePushFailure
+  )
+  {
+    this(
+        maxRowsInMemory,
+        maxSizeInBytes,
+        basePersistDirectory,
+        enablePushFailure,
+        new SimpleRowIngestionMeters(),
+        false,
+        false
+    );
+  }
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      @Nullable final File basePersistDirectory,
+      final boolean enablePushFailure,
+      final RowIngestionMeters rowIngestionMeters
+  )
+  {
+    this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters,
+         false, false
+    );
+  }
+  
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      @Nullable final File basePersistDirectory,
+      final boolean enablePushFailure,
+      final RowIngestionMeters rowIngestionMeters,
+      final boolean skipBytesInMemoryOverheadCheck,
+      final boolean useLegacyBatchProcessing
+  )
+  {
+    objectMapper = new DefaultObjectMapper();
+    objectMapper.registerSubtypes(LinearShardSpec.class);
+
+    final Map<String, Object> parserMap = objectMapper.convertValue(
+        new MapInputRowParser(
+            new JSONParseSpec(
+                new TimestampSpec("ts", "auto", null),
+                new DimensionsSpec(null, null, null),
+                null,
+                null,
+                null
+            )
+        ),
+        Map.class
+    );
+
+    schema = new DataSchema(
+        DATASOURCE,
+        null,
+        null,
+        new AggregatorFactory[]{
+            new CountAggregatorFactory("count"),
+            new LongSumAggregatorFactory("met", "met")
+        },
+        new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+        null,
+        parserMap,
+        objectMapper
+    );
+
+    tuningConfig = new TestIndexTuningConfig(
+        TuningConfig.DEFAULT_APPENDABLE_INDEX,
+        maxRowsInMemory,
+        maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
+        skipBytesInMemoryOverheadCheck,
+        new IndexSpec(),
+        0,
+        false,
+        0L,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+        IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
+        basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory
+    );
+    metrics = new FireDepartmentMetrics();
+
+    IndexIO indexIO = new IndexIO(
+        objectMapper,
+        () -> 0
+    );
+    IndexMerger indexMerger = new IndexMergerV9(
+        objectMapper,
+        indexIO,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance()
+    );
+
+    emitter = new ServiceEmitter(
+        "test",
+        "test",
+        new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+    DataSegmentPusher dataSegmentPusher = new DataSegmentPusher()
+    {
+      private boolean mustFail = true;
+
+      @Deprecated
+      @Override
+      public String getPathForHadoop(String dataSource)
+      {
+        return getPathForHadoop();
+      }
+
+      @Override
+      public String getPathForHadoop()
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
+      {
+        if (enablePushFailure && mustFail) {
+          mustFail = false;
+          throw new IOException("Push failure test");
+        } else if (enablePushFailure) {
+          mustFail = true;
+        }
+        pushedSegments.add(segment);
+        return segment;
+      }
+
+      @Override
+      public Map<String, Object> makeLoadSpec(URI uri)
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+    appenderator = Appenderators.createOffline(
+        schema.getDataSource(),
+        schema,
+        tuningConfig,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        indexIO,
+        indexMerger,
+        rowIngestionMeters,
+        new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
+        useLegacyBatchProcessing
+    );
+  }
+
+  private long getDefaultMaxBytesInMemory()
+  {
+    return (Runtime.getRuntime().totalMemory()) / 3;
+  }
+
+  public DataSchema getSchema()
+  {
+    return schema;
+  }
+
+  public AppenderatorConfig getTuningConfig()
+  {
+    return tuningConfig;
+  }
+
+  public FireDepartmentMetrics getMetrics()
+  {
+    return metrics;
+  }
+
+  public ObjectMapper getObjectMapper()
+  {
+    return objectMapper;
+  }
+
+  public Appenderator getAppenderator()
+  {
+    return appenderator;
+  }
+
+  public List<DataSegment> getPushedSegments()
+  {
+    return pushedSegments;
+  }
+
+  @Override
+  public void close() throws Exception
+  {
+    appenderator.close();
+    emitter.close();
+    FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
+  }
+
+  private static File createNewBasePersistDirectory()
+  {
+    return FileUtils.createTempDir("druid-batch-persist");
+  }
+
+
+  private static class TestIndexTuningConfig implements AppenderatorConfig
+  {
+    private final AppendableIndexSpec appendableIndexSpec;
+    private final int maxRowsInMemory;
+    private final long maxBytesInMemory;
+    private final boolean skipBytesInMemoryOverheadCheck;
+    private final int maxColumnsToMerge;
+    private final PartitionsSpec partitionsSpec;
+    private final IndexSpec indexSpec;
+    private final File basePersistDirectory;
+    private final int maxPendingPersists;
+    private final boolean reportParseExceptions;
+    private final long pushTimeout;
+    private final IndexSpec indexSpecForIntermediatePersists;
+    @Nullable
+    private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
+
+    public TestIndexTuningConfig(
+         AppendableIndexSpec appendableIndexSpec,
+         Integer maxRowsInMemory,
+         Long maxBytesInMemory,
+         Boolean skipBytesInMemoryOverheadCheck,
+         IndexSpec indexSpec,
+         Integer maxPendingPersists,
+         Boolean reportParseExceptions,
+         Long pushTimeout,
+         @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+         Integer maxColumnsToMerge,
+         File basePersistDirectory
+    )
+    {
+      this.appendableIndexSpec = appendableIndexSpec;
+      this.maxRowsInMemory = maxRowsInMemory;
+      this.maxBytesInMemory = maxBytesInMemory;
+      this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
+      this.indexSpec = indexSpec;
+      this.maxPendingPersists = maxPendingPersists;
+      this.reportParseExceptions = reportParseExceptions;
+      this.pushTimeout = pushTimeout;
+      this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
+      this.maxColumnsToMerge = maxColumnsToMerge;
+      this.basePersistDirectory = basePersistDirectory;
+
+      this.partitionsSpec = null;
+      this.indexSpecForIntermediatePersists = this.indexSpec;
+    }
+
+    @Override
+    public TestIndexTuningConfig withBasePersistDirectory(File dir)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public AppendableIndexSpec getAppendableIndexSpec()
+    {
+      return appendableIndexSpec;
+    }
+    
+    @Override
+    public int getMaxRowsInMemory()
+    {
+      return maxRowsInMemory;
+    }
+    
+    @Override
+    public long getMaxBytesInMemory()
+    {
+      return maxBytesInMemory;
+    }
+    
+    @Override
+    public boolean isSkipBytesInMemoryOverheadCheck()
+    {
+      return skipBytesInMemoryOverheadCheck;
+    }
+    
+    @Nullable
+    @Override
+    public PartitionsSpec getPartitionsSpec()
+    {
+      return partitionsSpec;
+    }
+
+    @Override
+    public IndexSpec getIndexSpec()
+    {
+      return indexSpec;
+    }
+    
+    @Override
+    public IndexSpec getIndexSpecForIntermediatePersists()
+    {
+      return indexSpecForIntermediatePersists;
+    }
+    
+    @Override
+    public int getMaxPendingPersists()
+    {
+      return maxPendingPersists;
+    }
+
+    @Override
+    public boolean isReportParseExceptions()
+    {
+      return reportParseExceptions;
+    }
+
+    @Nullable
+    @Override
+    public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
+    {
+      return segmentWriteOutMediumFactory;
+    }
+
+    @Override
+    public int getMaxColumnsToMerge()
+    {
+      return maxColumnsToMerge;
+    }
+
+    @Override
+    public File getBasePersistDirectory()
+    {
+      return basePersistDirectory;
+    }
+
+    @Override
+    public Period getIntermediatePersistPeriod()
+    {
+      return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs
+    }
+    
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestIndexTuningConfig that = (TestIndexTuningConfig) o;
+      return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+             maxRowsInMemory == that.maxRowsInMemory &&
+             maxBytesInMemory == that.maxBytesInMemory &&
+             skipBytesInMemoryOverheadCheck == that.skipBytesInMemoryOverheadCheck &&
+             maxColumnsToMerge == that.maxColumnsToMerge &&
+             maxPendingPersists == that.maxPendingPersists &&
+             reportParseExceptions == that.reportParseExceptions &&
+             pushTimeout == that.pushTimeout &&
+             Objects.equals(partitionsSpec, that.partitionsSpec) &&
+             Objects.equals(indexSpec, that.indexSpec) &&
+             Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
+             Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
+             Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(
+          appendableIndexSpec,
+          maxRowsInMemory,
+          maxBytesInMemory,
+          skipBytesInMemoryOverheadCheck,
+          maxColumnsToMerge,
+          partitionsSpec,
+          indexSpec,
+          indexSpecForIntermediatePersists,
+          basePersistDirectory,
+          maxPendingPersists,
+          reportParseExceptions,
+          pushTimeout,
+          segmentWriteOutMediumFactory
+      );
+    }
+
+    @Override
+    public String toString()
+    {
+      return "IndexTuningConfig{" +
+             "maxRowsInMemory=" + maxRowsInMemory +
+             ", maxBytesInMemory=" + maxBytesInMemory +
+             ", skipBytesInMemoryOverheadCheck=" + skipBytesInMemoryOverheadCheck +
+             ", maxColumnsToMerge=" + maxColumnsToMerge +
+             ", partitionsSpec=" + partitionsSpec +
+             ", indexSpec=" + indexSpec +
+             ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists +
+             ", basePersistDirectory=" + basePersistDirectory +
+             ", maxPendingPersists=" + maxPendingPersists +
+             ", reportParseExceptions=" + reportParseExceptions +
+             ", pushTimeout=" + pushTimeout +
+             ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
+             '}';
+    }
+  }
+
+}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
index 36c9c5b..f02458b 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.segment.realtime.appenderator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
 import com.google.inject.Injector;
@@ -43,7 +42,6 @@
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.plumber.Committers;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -170,13 +168,13 @@
           "A",
           new LinearShardSpec(0)
       );
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
-      appenderator.add(identifier, AppenderatorTest.ir("2000", "bar", 1), Suppliers.ofInstance(Committers.nil()));
-      Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
-      appenderator.add(identifier, AppenderatorTest.ir("2000", "baz", 1), Suppliers.ofInstance(Committers.nil()));
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(identifier, StreamAppenderatorTest.ir("2000", "bar", 1), null);
+      Assert.assertEquals(1, ((BatchAppenderator) appenderator).getRowsInMemory());
+      appenderator.add(identifier, StreamAppenderatorTest.ir("2000", "baz", 1), null);
+      Assert.assertEquals(2, ((BatchAppenderator) appenderator).getRowsInMemory());
       appenderator.close();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((BatchAppenderator) appenderator).getRowsInMemory());
     }
     finally {
       appenderator.close();
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 408aa97..4f9cd3c 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -545,11 +545,5 @@
     {
       throw new UnsupportedOperationException();
     }
-    @Override
-    public boolean isRealTime()
-    {
-      return true;
-    }
-
   }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index fa1f7a9..b6154a6 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -95,7 +95,7 @@
   );
 
   private SegmentAllocator allocator;
-  private AppenderatorTester appenderatorTester;
+  private StreamAppenderatorTester streamAppenderatorTester;
   private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
   private StreamAppenderatorDriver driver;
   private DataSegmentKiller dataSegmentKiller;
@@ -107,15 +107,15 @@
   @Before
   public void setUp()
   {
-    appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
+    streamAppenderatorTester = new StreamAppenderatorTester(MAX_ROWS_IN_MEMORY);
     allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
     segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
     dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
     driver = new StreamAppenderatorDriver(
-        appenderatorTester.getAppenderator(),
+        streamAppenderatorTester.getAppenderator(),
         allocator,
         segmentHandoffNotifierFactory,
-        new TestUsedSegmentChecker(appenderatorTester),
+        new TestUsedSegmentChecker(streamAppenderatorTester.getPushedSegments()),
         dataSegmentKiller,
         OBJECT_MAPPER,
         new FireDepartmentMetrics()
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
similarity index 83%
rename from server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
rename to server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index 0c1285d..bc123b1 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -61,7 +61,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class AppenderatorTest extends InitializedNullHandlingTest
+public class StreamAppenderatorTest extends InitializedNullHandlingTest
 {
   private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
       si("2000/2001", "A", 0),
@@ -72,7 +72,7 @@
   @Test
   public void testSimpleIngestion() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       boolean thrown;
 
@@ -83,7 +83,7 @@
       Assert.assertEquals(null, appenderator.startJob());
 
       // getDataSource
-      Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
+      Assert.assertEquals(StreamAppenderatorTester.DATASOURCE, appenderator.getDataSource());
 
       // add
       commitMetadata.put("x", "1");
@@ -157,7 +157,7 @@
   public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
   {
     try (
-        final AppenderatorTester tester = new AppenderatorTester(
+        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
             100,
             1024,
             null,
@@ -193,15 +193,15 @@
       int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
       Assert.assertEquals(
           182 + nullHandlingOverhead,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
       Assert.assertEquals(
           182 + nullHandlingOverhead,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
       );
       appenderator.close();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
     }
   }
 
@@ -209,7 +209,7 @@
   public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
   {
     try (
-        final AppenderatorTester tester = new AppenderatorTester(
+        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
             100,
             1024,
             null,
@@ -243,21 +243,21 @@
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
       //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182
       int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
-      Assert.assertEquals(182 + nullHandlingOverhead, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
+      Assert.assertEquals(182 + nullHandlingOverhead, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
       Assert.assertEquals(
           364 + 2 * nullHandlingOverhead,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
       appenderator.close();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
     }
   }
 
   @Test
   public void testMaxBytesInMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, 15000, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 15000, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -285,38 +285,38 @@
       //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
       int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
       int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
-      int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+      int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
       // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // We do multiple more adds to the same sink to cause persist.
       for (int i = 0; i < 53; i++) {
         appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
       }
-      sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+      sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
       // currHydrant size is 0 since we just persist all indexes to disk.
       currentInMemoryIndexSize = 0;
       // We are now over maxSizeInBytes after the add. Hence, we do a persist.
       // currHydrant in the sink has 0 bytesInMemory since we just did a persist
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
       // 1 dimension columns, 2 metric column, 1 time column.
-      int mappedIndexSize = 1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
-                            AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
-                            AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+      int mappedIndexSize = 1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+                            StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+                            StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // Add a single row after persisted
@@ -325,11 +325,11 @@
       currentInMemoryIndexSize = 182 + nullHandlingOverhead;
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // We do multiple more adds to the same sink to cause persist.
@@ -342,28 +342,28 @@
       // currHydrant in the sink has 0 bytesInMemory since we just did a persist
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
       // 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
       // persists.
-      mappedIndexSize = 2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
-                            AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
-                            AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
+      mappedIndexSize = 2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+                             StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+                             StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
       appenderator.close();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
     }
   }
 
   @Test(expected = RuntimeException.class)
   public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, 5180, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 5180, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -394,7 +394,7 @@
   public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
   {
     try (
-        final AppenderatorTester tester = new AppenderatorTester(
+        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
             100,
             10,
             null,
@@ -429,13 +429,13 @@
       // Expected 0 since we persisted after the add
       Assert.assertEquals(
           0,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
       // Expected 0 since we persisted after the add
       Assert.assertEquals(
           0,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
     }
   }
@@ -443,7 +443,7 @@
   @Test
   public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, 10000, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 10000, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -471,24 +471,24 @@
       // Still under maxSizeInBytes after the add. Hence, we do not persist yet
       int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
       int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
-      int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+      int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // Close with row still in memory (no persist)
       appenderator.close();
 
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
     }
   }
 
   @Test
   public void testMaxBytesInMemoryInMultipleSinks() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, 31100, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 31100, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -518,19 +518,19 @@
       //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
       int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
       int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
-      int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+      int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
       // currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
       );
       Assert.assertEquals(
           (2 * currentInMemoryIndexSize) + sinkSizeOverhead,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // We do multiple more adds to the same sink to cause persist.
@@ -538,27 +538,27 @@
         appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
         appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier);
       }
-      sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+      sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
       // currHydrant size is 0 since we just persist all indexes to disk.
       currentInMemoryIndexSize = 0;
       // We are now over maxSizeInBytes after the add. Hence, we do a persist.
       // currHydrant in the sink has 0 bytesInMemory since we just did a persist
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
       );
       // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
       // 1 dimension columns, 2 metric column, 1 time column.
-      int mappedIndexSize = 2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
-                            AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
-                            AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
+      int mappedIndexSize = 2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+                                 StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+                                 StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // Add a single row after persisted to sink 0
@@ -567,29 +567,29 @@
       currentInMemoryIndexSize = 182 + nullHandlingOverhead;
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           0,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
       // Now add a single row to sink 1
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bob", 1), committerSupplier);
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
       );
       Assert.assertEquals(
           (2 * currentInMemoryIndexSize) + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
 
       // We do multiple more adds to the both sink to cause persist.
@@ -603,32 +603,32 @@
       // currHydrant in the sink has 0 bytesInMemory since we just did a persist
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
       Assert.assertEquals(
           currentInMemoryIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
       );
       // Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
       // 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
       // persists.
-      mappedIndexSize = 2 * (2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
-                             AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
-                             AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER));
+      mappedIndexSize = 2 * (2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+                                  StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
+                                  StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER));
       Assert.assertEquals(
           currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
       appenderator.close();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
     }
   }
 
   @Test
   public void testIgnoreMaxBytesInMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, -1, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, -1, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -650,33 +650,33 @@
         };
       };
 
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.startJob();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
       //we still calculate the size even when ignoring it to make persist decision
       int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
       Assert.assertEquals(
           182 + nullHandlingOverhead,
-          ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
+          ((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
       );
-      Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
-      int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
+      int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
       Assert.assertEquals(
           (364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
-          ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
+          ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
       );
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.close();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
     }
   }
 
   @Test
   public void testMaxRowsInMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(3, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = new Supplier<Committer>()
@@ -703,23 +703,23 @@
         }
       };
 
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.startJob();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
-      Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier);
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier);
-      Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.persistAll(committerSupplier.get());
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.close();
     }
   }
@@ -727,7 +727,7 @@
   @Test
   public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(3, false)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, false)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -749,23 +749,23 @@
         };
       };
 
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.startJob();
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier, false);
-      Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
-      Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier, false);
-      Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(3, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier, false);
-      Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(4, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier, false);
-      Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(5, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.persistAll(committerSupplier.get());
-      Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
+      Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
       appenderator.close();
     }
   }
@@ -774,7 +774,7 @@
   public void testRestoreFromDisk() throws Exception
   {
     final RealtimeTuningConfig tuningConfig;
-    try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       tuningConfig = tester.getTuningConfig();
 
@@ -816,7 +816,7 @@
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier);
       appenderator.close();
 
-      try (final AppenderatorTester tester2 = new AppenderatorTester(
+      try (final StreamAppenderatorTester tester2 = new StreamAppenderatorTester(
           2,
           -1,
           tuningConfig.getBasePersistDirectory(),
@@ -833,7 +833,7 @@
   @Test(timeout = 60_000L)
   public void testTotalRowCount() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(3, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
       final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
@@ -876,7 +876,7 @@
   public void testVerifyRowIngestionMetrics() throws Exception
   {
     final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
-    try (final AppenderatorTester tester = new AppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
       final Appenderator appenderator = tester.getAppenderator();
       appenderator.startJob();
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
@@ -892,7 +892,7 @@
   @Test
   public void testQueryByIntervals() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
       final Appenderator appenderator = tester.getAppenderator();
 
       appenderator.startJob();
@@ -906,7 +906,7 @@
 
       // Query1: 2000/2001
       final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .intervals(ImmutableList.of(Intervals.of("2000/2001")))
                                            .aggregators(
                                                Arrays.asList(
@@ -932,7 +932,7 @@
 
       // Query2: 2000/2002
       final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .intervals(ImmutableList.of(Intervals.of("2000/2002")))
                                            .aggregators(
                                                Arrays.asList(
@@ -962,7 +962,7 @@
 
       // Query3: 2000/2001T01
       final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .intervals(ImmutableList.of(Intervals.of("2000/2001T01")))
                                            .aggregators(
                                                Arrays.asList(
@@ -991,7 +991,7 @@
 
       // Query4: 2000/2001T01, 2001T03/2001T04
       final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .intervals(
                                                ImmutableList.of(
                                                    Intervals.of("2000/2001T01"),
@@ -1028,7 +1028,7 @@
   @Test
   public void testQueryBySegments() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
+    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
       final Appenderator appenderator = tester.getAppenderator();
 
       appenderator.startJob();
@@ -1042,7 +1042,7 @@
 
       // Query1: segment #2
       final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .aggregators(
                                                Arrays.asList(
                                                    new LongSumAggregatorFactory("count", "count"),
@@ -1078,7 +1078,7 @@
 
       // Query2: segment #2, partial
       final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .aggregators(
                                                Arrays.asList(
                                                    new LongSumAggregatorFactory("count", "count"),
@@ -1114,7 +1114,7 @@
 
       // Query3: segment #2, two disjoint intervals
       final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
-                                           .dataSource(AppenderatorTester.DATASOURCE)
+                                           .dataSource(StreamAppenderatorTester.DATASOURCE)
                                            .aggregators(
                                                Arrays.asList(
                                                    new LongSumAggregatorFactory("count", "count"),
@@ -1154,7 +1154,7 @@
       );
 
       final ScanQuery query4 = Druids.newScanQueryBuilder()
-                                     .dataSource(AppenderatorTester.DATASOURCE)
+                                     .dataSource(StreamAppenderatorTester.DATASOURCE)
                                      .intervals(
                                          new MultipleSpecificSegmentSpec(
                                              ImmutableList.of(
@@ -1194,7 +1194,7 @@
   private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum)
   {
     return new SegmentIdWithShardSpec(
-        AppenderatorTester.DATASOURCE,
+        StreamAppenderatorTester.DATASOURCE,
         Intervals.of(interval),
         version,
         new LinearShardSpec(partitionNum)
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
similarity index 97%
rename from server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
rename to server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 4940659..003e2fb 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -77,7 +77,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 
-public class AppenderatorTester implements AutoCloseable
+public class StreamAppenderatorTester implements AutoCloseable
 {
   public static final String DATASOURCE = "foo";
 
@@ -94,14 +94,14 @@
 
   private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
 
-  public AppenderatorTester(
+  public StreamAppenderatorTester(
       final int maxRowsInMemory
   )
   {
     this(maxRowsInMemory, -1, null, false);
   }
 
-  public AppenderatorTester(
+  public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final boolean enablePushFailure
   )
@@ -109,7 +109,7 @@
     this(maxRowsInMemory, -1, null, enablePushFailure);
   }
 
-  public AppenderatorTester(
+  public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final long maxSizeInBytes,
       final boolean enablePushFailure
@@ -118,7 +118,7 @@
     this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
   }
 
-  public AppenderatorTester(
+  public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final long maxSizeInBytes,
       final File basePersistDirectory,
@@ -128,7 +128,7 @@
     this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), false);
   }
 
-  public AppenderatorTester(
+  public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final long maxSizeInBytes,
       final File basePersistDirectory,
@@ -139,7 +139,7 @@
     this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, false);
   }
 
-  public AppenderatorTester(
+  public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final long maxSizeInBytes,
       final File basePersistDirectory,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java
index 9db997b..2c7fd56 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestUsedSegmentChecker.java
@@ -26,22 +26,23 @@
 import org.apache.druid.timeline.partition.PartitionChunk;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 public class TestUsedSegmentChecker implements UsedSegmentChecker
 {
-  private final AppenderatorTester appenderatorTester;
+  private final List<DataSegment> pushedSegments;
 
-  public TestUsedSegmentChecker(AppenderatorTester appenderatorTester)
+  public TestUsedSegmentChecker(List<DataSegment> pushedSegments)
   {
-    this.appenderatorTester = appenderatorTester;
+    this.pushedSegments = pushedSegments;
   }
 
   @Override
   public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers)
   {
     final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
-    VersionedIntervalTimeline.addSegments(timeline, appenderatorTester.getPushedSegments().iterator());
+    VersionedIntervalTimeline.addSegments(timeline, pushedSegments.iterator());
 
     final Set<DataSegment> retVal = new HashSet<>();
     for (SegmentIdWithShardSpec identifier : identifiers) {