Merge pull request #1408 from MabelYC/interfaceChange

SAMZA-2574: improve flexibility of SystemFactory interface
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
index 7d76c8c..ee4490f 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -238,7 +238,9 @@
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
 
     public Thread newThread(Runnable runnable) {
-      return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+      Thread thread = new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+      thread.setDaemon(true);
+      return thread;
     }
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
new file mode 100644
index 0000000..6274c15
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {
+  private static final Logger LOG = LoggerFactory.getLogger(SideInputTask.class);
+
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public TaskName taskName() {
+    return this.taskName;
+  }
+
+  @Override
+  synchronized public void process(IncomingMessageEnvelope envelope, ReadableCoordinator coordinator,
+      TaskCallbackFactory callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();
+    this.metrics.processes().inc();
+    try {
+      this.taskSideInputHandler.process(envelope);
+      this.metrics.messagesActuallyProcessed().inc();
+      callback.complete();
+    } catch (Exception e) {
+      callback.failure(e);
+    }
+  }
+
+  @Override
+  public void window(ReadableCoordinator coordinator) {
+    throw new UnsupportedOperationException("Windowing is not applicable for side input tasks.");
+  }
+
+  @Override
+  public void scheduler(ReadableCoordinator coordinator) {
+    throw new UnsupportedOperationException("Scheduling is not applicable for side input tasks.");
+  }
+
+  @Override
+  synchronized public void commit() {
+    this.taskSideInputHandler.flush();
+    this.metrics.commits().inc();
+  }
+
+  @Override
+  public void endOfStream(ReadableCoordinator coordinator) {
+    LOG.info("Task {} has reached end of stream", this.taskName);
+  }
+
+  @Override
+  public boolean isWindowableTask() {
+    return false;
+  }
+
+  @Override
+  public Set<String> intermediateStreams() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<SystemStreamPartition> systemStreamPartitions() {
+    return this.taskSSPs;
+  }
+
+  @Override
+  public OffsetManager offsetManager() {
+    return null;
+  }
+
+  @Override
+  public TaskInstanceMetrics metrics() {
+    return this.metrics;
+  }
+
+  @Override
+  public EpochTimeScheduler epochTimeScheduler() {
+    return null;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index e292df9..f352bd0 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -248,7 +248,7 @@
               this.getSerdes(),
               jobConfig,
               new HashMap<>(),
-              new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap()),
+              new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap(), ""),
               JobContextImpl.fromConfigWithDefaults(jobConfig),
               containerContext,
               new HashMap<>(),
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
index 7ab4036..767b9ce 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
@@ -29,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -38,6 +39,7 @@
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -63,19 +65,23 @@
   private final Map<String, SideInputsProcessor> storeToProcessor;
   private final SystemAdmins systemAdmins;
   private final StreamMetadataCache streamMetadataCache;
+  // indicates to ContainerStorageManager that all side input ssps in this task are caught up
+  private final CountDownLatch taskCaughtUpLatch;
 
+  private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata;
   private Map<SystemStreamPartition, String> startingOffsets;
 
   public TaskSideInputHandler(TaskName taskName, TaskMode taskMode, File storeBaseDir,
       Map<String, StorageEngine> storeToStorageEngines, Map<String, Set<SystemStreamPartition>> storeToSSPs,
       Map<String, SideInputsProcessor> storeToProcessor, SystemAdmins systemAdmins,
-      StreamMetadataCache streamMetadataCache, Clock clock) {
+      StreamMetadataCache streamMetadataCache, CountDownLatch taskCaughtUpLatch, Clock clock) {
     validateProcessorConfiguration(storeToSSPs.keySet(), storeToProcessor);
 
     this.taskName = taskName;
     this.systemAdmins = systemAdmins;
     this.streamMetadataCache = streamMetadataCache;
     this.storeToProcessor = storeToProcessor;
+    this.taskCaughtUpLatch = taskCaughtUpLatch;
 
     this.sspToStores = new HashMap<>();
     storeToSSPs.forEach((store, ssps) -> {
@@ -119,6 +125,30 @@
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+
+    this.initialSideInputSSPMetadata = getInitialSideInputSSPMetadata();
+    LOG.info("Task {} will catch up to offsets {}", this.taskName, this.initialSideInputSSPMetadata);
+
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, SystemStreamMetadata.OffsetType.UPCOMING));
+  }
+
+  /**
+   * Retrieves the newest offset for each SSP
+   *
+   * @return a map of SSP to metadata
+   */
+  private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getInitialSideInputSSPMetadata() {
+    Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      boolean partitionsMetadataOnly = false;
+      SystemStreamMetadata systemStreamMetadata = this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), partitionsMetadataOnly);
+      if (systemStreamMetadata != null) {
+        SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
+            systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+        initialSideInputSSPMetadata.put(ssp, sspMetadata);
+      }
+    }
+    return initialSideInputSSPMetadata;
   }
 
   /**
@@ -150,6 +180,7 @@
     }
 
     this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+    checkCaughtUp(envelopeSSP, envelopeOffset, SystemStreamMetadata.OffsetType.NEWEST);
   }
 
   /**
@@ -193,9 +224,9 @@
   }
 
   /**
-   * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores.
-   * If the local file offset is available and is greater than the oldest available offset from source, uses it,
-   * else falls back to oldest offset in the source.
+   * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores. See doc
+   * of {@link StorageManagerUtil#getStartingOffset} for how file offsets and oldest offsets for each SSP are
+   * reconciled.
    *
    * @param fileOffsets offsets from the local offset file
    * @param oldestOffsets oldest offsets from the source
@@ -260,6 +291,43 @@
   }
 
   /**
+   * An SSP is considered caught up once the offset indicated for it in {@link #initialSideInputSSPMetadata} has been
+   * processed. Once the set of SSPs to catch up becomes empty, the latch for the task will count down, notifying
+   * {@link ContainerStorageManager} that it is caught up.
+   *
+   * @param ssp The SSP to be checked
+   * @param currentOffset The offset to be checked
+   * @param offsetTypeToCheck The type offset to compare {@code currentOffset} to.
+   */
+  private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset, SystemStreamMetadata.OffsetType offsetTypeToCheck) {
+    SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = this.initialSideInputSSPMetadata.get(ssp);
+    String offsetToCheck = sspMetadata == null ? null : sspMetadata.getOffset(offsetTypeToCheck);
+
+    LOG.trace("Checking offset {} against {} offset {} for {}.", currentOffset, offsetToCheck, offsetTypeToCheck, ssp);
+
+    Integer comparatorResult;
+    if (currentOffset == null || offsetToCheck == null) {
+      comparatorResult = -1;
+    } else {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+      comparatorResult = systemAdmin.offsetComparator(currentOffset, offsetToCheck);
+    }
+
+    // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
+    // latest offset.
+    if (comparatorResult != null && comparatorResult.intValue() >= 0) {
+      LOG.info("Side input ssp {} has caught up to offset {}.", ssp, offsetToCheck);
+      // if its caught up, we remove the ssp from the map
+      this.initialSideInputSSPMetadata.remove(ssp);
+      if (this.initialSideInputSSPMetadata.isEmpty()) {
+        // if the metadata list is now empty, all SSPs in the task are caught up so count down the latch
+        // this will only happen once, when the last ssp catches up
+        this.taskCaughtUpLatch.countDown();
+      }
+    }
+  }
+
+  /**
    * Validates that each store has an associated {@link SideInputsProcessor}
    */
   private void validateProcessorConfiguration(Set<String> stores, Map<String, SideInputsProcessor> storeToProcessor) {
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6fa4388..0f7ff76 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -152,7 +152,7 @@
               new StorageConfig(config).getNumPersistentStores(), maxHeapSizeBytes, containerThreadPoolSize,
               containerId, execEnvContainerId.orElse(""), taskClassVersion, samzaVersion, hostName,
               diagnosticsSystemStream, systemProducer,
-              Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled());
+              Duration.ofMillis(new TaskConfig(config).getShutdownMs()), jobConfig.getAutosizingEnabled(), config);
 
       diagnosticsManagerReporterPair = Optional.of(new ImmutablePair<>(diagnosticsManager, diagnosticsReporter));
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index e0b2fdc..73dcd20 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -26,7 +26,8 @@
 
 class SamzaContainerMetrics(
   val source: String = "unknown",
-  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val registry: ReadableMetricsRegistry = new MetricsRegistryMap,
+  val prefix: String = "") extends MetricsHelper {
 
   val commits = newCounter("commit-calls")
   val windows = newCounter("window-calls")
@@ -54,4 +55,5 @@
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L))
   }
 
+  override def getPrefix: String = prefix
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 94cfbdc..bdd773c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -26,7 +26,8 @@
 
 class TaskInstanceMetrics(
   val source: String = "unknown",
-  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val registry: ReadableMetricsRegistry = new MetricsRegistryMap,
+  val prefix: String = "") extends MetricsHelper {
 
   val commits = newCounter("commit-calls")
   val windows = newCounter("window-calls")
@@ -41,4 +42,6 @@
   def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: () => String) {
     newGauge("%s-%s-%d-offset" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), getValue)
   }
+
+  override def getPrefix: String = prefix
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index f77dab8..93ca566 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -68,6 +69,7 @@
   private final int containerThreadPoolSize;
   private final Map<String, ContainerModel> containerModels;
   private final boolean autosizingEnabled;
+  private final Config config;
   private boolean jobParamsEmitted = false;
 
   private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data
@@ -93,12 +95,14 @@
       String hostname,
       SystemStream diagnosticSystemStream,
       SystemProducer systemProducer,
-      Duration terminationDuration, boolean autosizingEnabled) {
+      Duration terminationDuration,
+      boolean autosizingEnabled,
+      Config config) {
 
     this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numPersistentStores, maxHeapSizeBytes, containerThreadPoolSize,
         containerId, executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer,
         terminationDuration, Executors.newSingleThreadScheduledExecutor(
-            new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled);
+            new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()), autosizingEnabled, config);
   }
 
   @VisibleForTesting
@@ -118,7 +122,9 @@
       SystemStream diagnosticSystemStream,
       SystemProducer systemProducer,
       Duration terminationDuration,
-      ScheduledExecutorService executorService, boolean autosizingEnabled) {
+      ScheduledExecutorService executorService,
+      boolean autosizingEnabled,
+      Config config) {
     this.jobName = jobName;
     this.jobId = jobId;
     this.containerModels = containerModels;
@@ -140,6 +146,7 @@
     this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters
     this.scheduler = executorService;
     this.autosizingEnabled = autosizingEnabled;
+    this.config = config;
 
     resetTime = Instant.now();
     this.systemProducer.register(getClass().getSimpleName());
@@ -208,6 +215,7 @@
           diagnosticsStreamMessage.addMaxHeapSize(maxHeapSizeBytes);
           diagnosticsStreamMessage.addContainerThreadPoolSize(containerThreadPoolSize);
           diagnosticsStreamMessage.addAutosizingEnabled(autosizingEnabled);
+          diagnosticsStreamMessage.addConfig(config);
         }
 
         // Add stop event list to the message
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
index 15cce03..bea7ce2 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
@@ -60,6 +62,7 @@
   private static final String CONTAINER_THREAD_POOL_SIZE_METRIC_NAME = "containerThreadPoolSize";
   private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
   private static final String AUTOSIZING_ENABLED_METRIC_NAME = "autosizingEnabled";
+  private static final String CONFIG_METRIC_NAME = "config";
 
   private final MetricsHeader metricsHeader;
   private final Map<String, Map<String, Object>> metricsMessage;
@@ -156,6 +159,14 @@
   }
 
   /**
+   * Add the job's config to the message.
+   * @param config the config to add.
+   */
+  public void addConfig(Config config) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME, (Map<String, String>) config);
+  }
+
+  /**
    * Convert this message into a {@link MetricsSnapshot}, useful for serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}.
    * @return
    */
@@ -228,6 +239,14 @@
     return (Boolean) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, AUTOSIZING_ENABLED_METRIC_NAME);
   }
 
+  /**
+   * This method gets the config of the job from the MetricsMessage.
+   * @return the config of the job.
+   */
+  public Config getConfig() {
+    return new MapConfig((Map<String, String>) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME));
+  }
+
   // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link MetricsSnapshot}.
   //   * This is typically used when deserializing messages from a diagnostics-stream.
   //   * @param metricsSnapshot
@@ -254,6 +273,7 @@
       diagnosticsStreamMessage.addContainerThreadPoolSize((Integer) diagnosticsManagerGroupMap.get(CONTAINER_THREAD_POOL_SIZE_METRIC_NAME));
       diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
       diagnosticsStreamMessage.addAutosizingEnabled((Boolean) diagnosticsManagerGroupMap.get(AUTOSIZING_ENABLED_METRIC_NAME));
+      diagnosticsStreamMessage.addConfig(new MapConfig((Map<String, String>) diagnosticsManagerGroupMap.get(CONFIG_METRIC_NAME)));
     }
 
     if (containerMetricsGroupMap != null && containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) {
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 19411b4..412a3c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -23,7 +23,6 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -34,14 +33,12 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.samza.SamzaException;
@@ -50,6 +47,8 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.RunLoop;
+import org.apache.samza.container.RunLoopTask;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
@@ -68,7 +67,6 @@
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SSPMetadataCache;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemConsumers;
@@ -110,14 +108,13 @@
 public class ContainerStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
-  private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read Thread";
-  private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread";
+  private static final String SIDEINPUTS_THREAD_NAME = "SideInputs Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
   // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer
 
-  private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; // Timeout with which sideinput read thread checks for exceptions
-  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = Duration.ofMinutes(1); // Period with which sideinputs are flushed
-
+  // Timeout with which sideinput thread checks for exceptions and for whether SSPs as caught up
+  private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10;
+  private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60;
 
   /** Maps containing relevant per-task objects */
   private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -154,17 +151,13 @@
   private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs;
   private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the taskSideInputSSPs recorded at start, used to determine when sideInputs are caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the sideInput-read thread to signal to the main thread
+  private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread
   private volatile boolean shouldShutdown = false;
+  private RunLoop sideInputRunLoop;
 
-  private final ExecutorService sideInputsReadExecutor = Executors.newSingleThreadExecutor(
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
+  private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_THREAD_NAME).build());
 
-  private final ScheduledExecutorService sideInputsFlushExecutor = Executors.newSingleThreadScheduledExecutor(
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
-  private ScheduledFuture sideInputsFlushFuture;
   private volatile Throwable sideInputException = null;
 
   private final Config config;
@@ -195,6 +188,7 @@
     this.checkpointManager = checkpointManager;
     this.containerModel = containerModel;
     this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, sideInputSystemStreams);
+    this.sideInputTaskLatches = new HashMap<>();
     this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
         .flatMap(m -> m.values().stream())
         .flatMap(Collection::stream)
@@ -603,28 +597,35 @@
 
         Map<String, StorageEngine> sideInputStores = getSideInputStores(taskName);
         Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new HashMap<>();
-
+        boolean taskHasSideInputs = false;
         for (String storeName : sideInputStores.keySet()) {
           Set<SystemStreamPartition> storeSSPs = this.taskSideInputStoreSSPs.get(taskName).get(storeName);
+          taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty();
           sideInputStoresToSSPs.put(storeName, storeSSPs);
         }
 
-        TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName,
-            taskModel.getTaskMode(),
-            loggedStoreBaseDirectory,
-            sideInputStores,
-            sideInputStoresToSSPs,
-            taskSideInputProcessors.get(taskName),
-            this.systemAdmins,
-            this.streamMetadataCache,
-            clock);
+        if (taskHasSideInputs) {
+          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
+          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);
 
-        sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
-          handlers.put(ssp, taskSideInputHandler);
-        });
+          TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName,
+              taskModel.getTaskMode(),
+              loggedStoreBaseDirectory,
+              sideInputStores,
+              sideInputStoresToSSPs,
+              taskSideInputProcessors.get(taskName),
+              this.systemAdmins,
+              this.streamMetadataCache,
+              taskCountDownLatch,
+              clock);
 
-        LOG.info("Created TaskSideInputHandler for task {}, sideInputStores {} and loggedStoreBaseDirectory {}",
-            taskName, sideInputStores, loggedStoreBaseDirectory);
+          sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
+            handlers.put(ssp, taskSideInputHandler);
+          });
+
+          LOG.info("Created TaskSideInputHandler for task {}, sideInputStores {} and loggedStoreBaseDirectory {}",
+              taskName, sideInputStores, loggedStoreBaseDirectory);
+        }
       });
     }
     return handlers;
@@ -728,22 +729,26 @@
     // initialize the sideInputStorageManagers
     getSideInputHandlers().forEach(TaskSideInputHandler::init);
 
-    // start the checkpointing thread at the commit-ms frequency
-    TaskConfig taskConfig = new TaskConfig(config);
-    sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          getSideInputHandlers().forEach(TaskSideInputHandler::flush);
-        } catch (Exception e) {
-          LOG.error("Exception during flushing sideInputs", e);
-          sideInputException = e;
-        }
-      }
-    }, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
+    Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream()
+        .distinct()
+        .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity()));
 
-    // set the latch to the number of sideInput SSPs
-    this.sideInputsCaughtUp = new CountDownLatch(this.sspSideInputHandlers.keySet().size());
+    Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new HashMap<>();
+    Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+    this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+      Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream()
+          .flatMap(Set::stream)
+          .collect(Collectors.toSet());
+
+      if (!taskSSPs.isEmpty()) {
+        String sideInputSource = SIDEINPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source();
+        TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDEINPUTS_METRICS_PREFIX);
+        sideInputTaskMetrics.put(taskName, sideInputMetrics);
+
+        RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+        sideInputTasks.put(taskName, sideInputTask);
+      }
+    });
 
     // register all sideInput SSPs with the consumers
     for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
@@ -758,41 +763,36 @@
       sideInputSystemConsumers.register(ssp, startingOffset);
       taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
           ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
-
-      SystemStreamMetadata systemStreamMetadata = streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
-      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
-          (systemStreamMetadata == null) ? null : systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
-
-      // record a copy of the sspMetadata, to later check if its caught up
-      initialSideInputSSPMetadata.put(ssp, sspMetadata);
-
-      // check if the ssp is caught to upcoming, even at start
-      checkSideInputCaughtUp(ssp, startingOffset, SystemStreamMetadata.OffsetType.UPCOMING, false);
+      sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
     }
 
     // start the systemConsumers for consuming input
     this.sideInputSystemConsumers.start();
 
+    TaskConfig taskConfig = new TaskConfig(this.config);
+    SamzaContainerMetrics sideInputContainerMetrics =
+        new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(),
+            this.samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX);
+
+    this.sideInputRunLoop = new RunLoop(sideInputTasks,
+        null, // all operations are executed in the main runloop thread
+        this.sideInputSystemConsumers,
+        1, // single message in flight per task
+        -1, // no windowing
+        taskConfig.getCommitMs(),
+        taskConfig.getCallbackTimeoutMs(),
+        // TODO consolidate these container configs SAMZA-2275
+        this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)),
+        taskConfig.getMaxIdleMs(),
+        sideInputContainerMetrics,
+        System::nanoTime,
+        false); // commit must be synchronous to ensure integrity of state flush
 
     try {
-
-    // submit the sideInput read runnable
-      sideInputsReadExecutor.submit(() -> {
+      sideInputsExecutor.submit(() -> {
         try {
-          while (!shouldShutdown) {
-            IncomingMessageEnvelope envelope = sideInputSystemConsumers.choose(true);
-
-            if (envelope != null) {
-              if (!envelope.isEndOfStream()) {
-                this.sspSideInputHandlers.get(envelope.getSystemStreamPartition()).process(envelope);
-              }
-
-              checkSideInputCaughtUp(envelope.getSystemStreamPartition(), envelope.getOffset(),
-                  SystemStreamMetadata.OffsetType.NEWEST, envelope.isEndOfStream());
-            } else {
-              LOG.trace("No incoming message was available");
-            }
-          }
+          sideInputRunLoop.run();
         } catch (Exception e) {
           LOG.error("Exception in reading sideInputs", e);
           sideInputException = e;
@@ -801,7 +801,7 @@
 
       // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown
       while (!shouldShutdown && sideInputException == null &&
-          !this.sideInputsCaughtUp.await(SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+          !awaitSideInputTasks()) {
         LOG.debug("Waiting for SideInput bootstrap to complete");
       }
 
@@ -824,39 +824,22 @@
     LOG.info("SideInput Restore complete");
   }
 
-  // Method to check if the given offset means the stream is caught up for reads
-  private void checkSideInputCaughtUp(SystemStreamPartition ssp, String offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
-
-    if (isEndOfStream) {
-      this.initialSideInputSSPMetadata.remove(ssp);
-      this.sideInputsCaughtUp.countDown();
-      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, offset, offsetType);
-      return;
+  /**
+   * Waits for all side input tasks to catch up until a timeout.
+   *
+   * @return False if waiting on any latch timed out, true otherwise
+   *
+   * @throws InterruptedException if waiting any of the latches is interrupted
+   */
+  private boolean awaitSideInputTasks() throws InterruptedException {
+    long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(SIDE_INPUT_CHECK_TIMEOUT_SECONDS);
+    for (CountDownLatch latch : this.sideInputTaskLatches.values()) {
+      long remainingMillisToWait = endTime - System.currentTimeMillis();
+      if (remainingMillisToWait <= 0 || !latch.await(remainingMillisToWait, TimeUnit.MILLISECONDS)) {
+        return false;
+      }
     }
-
-    SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = this.initialSideInputSSPMetadata.get(ssp);
-    String offsetToCheck = sspMetadata == null ? null : sspMetadata.getOffset(offsetType);
-    LOG.trace("Checking {} offset {} against {} for {}.", offsetType, offset, offsetToCheck, ssp);
-
-    // Let's compare offset of the chosen message with offsetToCheck.
-    Integer comparatorResult;
-    if (offset == null || offsetToCheck == null) {
-      comparatorResult = -1;
-    } else {
-      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
-      comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck);
-    }
-
-    // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
-    // latest offset.
-    if (comparatorResult != null && comparatorResult.intValue() >= 0) {
-
-      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, offset, offsetType);
-      // if its caught up, we remove the ssp from the map, and countDown the latch
-      this.initialSideInputSSPMetadata.remove(ssp);
-      this.sideInputsCaughtUp.countDown();
-      return;
-    }
+    return true;
   }
 
   /**
@@ -901,19 +884,16 @@
 
     // stop all sideinput consumers and stores
     if (this.hasSideInputs) {
-      sideInputsReadExecutor.shutdownNow();
-
-      this.sideInputSystemConsumers.stop();
-
-      // cancel all future sideInput flushes, shutdown the executor, and await for finish
-      sideInputsFlushFuture.cancel(false);
-      sideInputsFlushExecutor.shutdown();
+      this.sideInputRunLoop.shutdown();
+      this.sideInputsExecutor.shutdown();
       try {
-        sideInputsFlushExecutor.awaitTermination(SIDE_INPUT_FLUSH_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+        this.sideInputsExecutor.awaitTermination(SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         throw new SamzaException("Exception while shutting down sideInputs", e);
       }
 
+      this.sideInputSystemConsumers.stop();
+
       // stop all sideInputStores -- this will perform one last flush on the KV stores, and write the offset file
       this.getSideInputHandlers().forEach(TaskSideInputHandler::stop);
     }
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index da855a1..90d4c33 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -50,7 +50,7 @@
 public class TestRunLoop {
   // Immutable objects shared by all test methods.
   private final ExecutorService executor = null;
-  private final SamzaContainerMetrics containerMetrics = new SamzaContainerMetrics("container", new MetricsRegistryMap());
+  private final SamzaContainerMetrics containerMetrics = new SamzaContainerMetrics("container", new MetricsRegistryMap(), "");
   private final long windowMs = -1;
   private final long commitMs = -1;
   private final long callbackTimeoutMs = 0;
@@ -522,7 +522,7 @@
   private RunLoopTask getMockRunLoopTask(TaskName taskName, SystemStreamPartition ssp0) {
     RunLoopTask task0 = mock(RunLoopTask.class);
     when(task0.systemStreamPartitions()).thenReturn(Collections.singleton(ssp0));
-    when(task0.metrics()).thenReturn(new TaskInstanceMetrics("test", new MetricsRegistryMap()));
+    when(task0.metrics()).thenReturn(new TaskInstanceMetrics("test", new MetricsRegistryMap(), ""));
     when(task0.taskName()).thenReturn(taskName);
     return task0;
   }
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
index 6429a54..8ff58eb 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.diagnostics;
 
+import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,6 +29,8 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
@@ -59,6 +62,9 @@
   private int numPersistentStores = 2;
   private int containerNumCores = 2;
   private boolean autosizingEnabled = false;
+  private Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId,
+      "cluster-manager.container.memory.mb", "1024", "cluster-manager.container. cpu.cores", "1",
+      "cluster-manager.container.retry.count", "8"));
   private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels();
   private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList();
 
@@ -80,7 +86,7 @@
     this.diagnosticsManager =
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores, maxHeapSize, containerThreadPoolSize,
             "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream,
-            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled);
+            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService, autosizingEnabled, config);
 
     exceptionEventList.forEach(
       diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
@@ -95,7 +101,7 @@
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
             maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
             hostname, diagnosticsSystemStream, mockSystemProducer, Duration.ofSeconds(1), mockExecutorService,
-            autosizingEnabled);
+            autosizingEnabled, config);
 
     diagnosticsManager.start();
 
@@ -114,7 +120,7 @@
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
             maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
             hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
-            autosizingEnabled);
+            autosizingEnabled, config);
 
     diagnosticsManager.stop();
 
@@ -134,7 +140,7 @@
         new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numPersistentStores,
             maxHeapSize, containerThreadPoolSize, "0", executionEnvContainerId, taskClassVersion, samzaVersion,
             hostname, diagnosticsSystemStream, mockSystemProducer, terminationDuration, mockExecutorService,
-            autosizingEnabled);
+            autosizingEnabled, config);
 
     diagnosticsManager.stop();
 
@@ -272,6 +278,7 @@
     Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue());
     Assert.assertEquals(numPersistentStores, diagnosticsStreamMessage.getNumPersistentStores().intValue());
     Assert.assertEquals(autosizingEnabled, diagnosticsStreamMessage.getAutosizingEnabled());
+    Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
   }
 
   private class MockSystemProducer implements SystemProducer {
diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
index cd506b2..72b1b5f 100644
--- a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.diagnostics;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,6 +27,8 @@
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
@@ -46,6 +49,7 @@
   private final String hostname = "sample host name";
   private final long timestamp = System.currentTimeMillis();
   private final long resetTimestamp = System.currentTimeMillis();
+  private final Config config = new MapConfig(ImmutableMap.of("job.name", jobName, "job.id", jobId));
 
   private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
     DiagnosticsStreamMessage diagnosticsStreamMessage =
@@ -55,6 +59,7 @@
     diagnosticsStreamMessage.addContainerMb(1024);
     diagnosticsStreamMessage.addContainerNumCores(2);
     diagnosticsStreamMessage.addNumPersistentStores(3);
+    diagnosticsStreamMessage.addConfig(config);
 
     diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
     return diagnosticsStreamMessage;
@@ -107,6 +112,7 @@
     Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
     Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores());
     Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumPersistentStores());
+    Assert.assertEquals(config, diagnosticsStreamMessage.getConfig());
     Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents());
     Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels());
     Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList());
@@ -139,6 +145,7 @@
     Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
     Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
     Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
+    Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("config"));
 
     DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
         DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
index 656b2ef..2ef206e 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -191,6 +192,7 @@
           storeToProcessor,
           systemAdmins,
           streamMetadataCache,
+          new CountDownLatch(1),
           clock));
     }
   }
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index ab8a29e..c7b3f47 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -38,7 +38,7 @@
 
   @Before
   public void setup() {
-    TaskInstanceMetrics metrics = new TaskInstanceMetrics("Partition 0", new MetricsRegistryMap());
+    TaskInstanceMetrics metrics = new TaskInstanceMetrics("Partition 0", new MetricsRegistryMap(), "");
     listener = new TaskCallbackListener() {
       @Override
       public void onComplete(TaskCallback callback) {
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index a9c0c09..fc6cb53 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -24,7 +24,7 @@
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StorageEngine, StoreProperties}
 import org.apache.samza.system.{ChangelogSSPIterator, OutgoingMessageEnvelope, SystemStreamPartition}
-import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.{MessageCollector, TaskInstanceCollector}
 import org.apache.samza.util.TimerUtil
 import java.nio.file.Path
 import java.util.Optional
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 21f9357..bfba754 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -25,12 +25,11 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
@@ -48,14 +47,15 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerdeFactory;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
@@ -67,6 +67,7 @@
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
 @Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
@@ -79,36 +80,44 @@
   // Hidden config for now. Will move to appropriate Config class when ready to.
   private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled";
 
-  protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
+  private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
 
-  protected static volatile boolean systemInitialized = false;
-
-  private Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
-  protected StreamAppenderMetrics metrics;
-
-  private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
-  protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
   private Thread transferThread;
+  private Config config = null;
+  private String streamName = null;
+  private final boolean usingAsyncLogger;
 
-  protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, String streamName) {
+  /**
+   * used to detect if this thread is called recursively
+   */
+  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
+
+  protected static final int DEFAULT_QUEUE_SIZE = 100;
+  protected static volatile boolean systemInitialized = false;
+  protected StreamAppenderMetrics metrics;
+  protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
+
+  protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
+      boolean usingAsyncLogger, String streamName) {
     super(name, filter, layout, ignoreExceptions);
     this.streamName = streamName;
+    this.usingAsyncLogger = usingAsyncLogger;
   }
 
   @Override
   public void start() {
     super.start();
-    String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
+    containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
     if (containerName != null) {
       isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG);
     } else {
@@ -116,6 +125,13 @@
           ". This is used as the key for the log appender, so can't proceed.");
     }
     key = containerName; // use the container name as the key for the logs
+    try {
+      // Serialize the key once, since we will use it for every event.
+      keyBytes = key.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException(
+          String.format("Container name: %s could not be encoded to bytes. %s cannot proceed.", key, getName()), e);
+    }
 
     // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
     if (isApplicationMaster) {
@@ -127,12 +143,7 @@
   }
 
   /**
-   * used to detect if this thread is called recursively
-   */
-  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
-
-  /**
-   * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
    * @return The configured stream name.
    */
@@ -141,7 +152,17 @@
   }
 
   /**
-   * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Getter for the Config parameter.
+   */
+  protected Config getConfig() {
+    if (config == null) {
+      config = fetchConfig();
+    }
+    return this.config;
+  }
+
+  /**
+   * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
    * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
    */
@@ -153,7 +174,7 @@
   }
 
   /**
-   * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
    * @param partitionCount Configurable partition count.
    */
@@ -168,8 +189,9 @@
       @PluginElement("Filter") final Filter filter,
       @PluginElement("Layout") Layout layout,
       @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+      @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
       @PluginAttribute("streamName") String streamName) {
-    return new StreamAppender(name, filter, layout, ignoreExceptions, streamName);
+    return new StreamAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
   }
 
   @Override
@@ -183,38 +205,17 @@
             setupSystem();
             systemInitialized = true;
           } else {
-            log.trace("Waiting for the JobCoordinator to be instantiated...");
+            System.out.println("Waiting for the JobCoordinator to be instantiated...");
           }
         } else {
-          // Serialize the event before adding to the queue to leverage the caller thread
-          // and ensure that the transferThread can keep up.
-          if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) {
-            // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
-            // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
-            // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
-            // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
-
-            // Scenario:
-            // T1: holds L1 and is waiting for L2
-            // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
-
-            // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
-            // so dropping events in the StreamAppender is our best recourse.
-
-            // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
-            int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
-            log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
-                queueTimeoutS,
-                systemStream.toString(),
-                messagesDropped));
-
-            // Emit a metric which can be monitored to ensure it doesn't happen often.
-            metrics.logMessagesDropped.inc(messagesDropped);
-          }
-          metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+          // handle event based on if async or sync logger is being used
+          handleEvent(event);
         }
       } catch (Exception e) {
-        System.err.println("[StreamAppender] Error sending log message:");
+        if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
+          metrics.logMessagesErrors.inc();
+        }
+        System.err.println(String.format("[%s] Error sending log message:", getName()));
         e.printStackTrace();
       } finally {
         recursiveCall.set(false);
@@ -224,6 +225,50 @@
     }
   }
 
+  /**
+   * If async-Logger is enabled, the log-event is sent directly to the systemProducer. Else, the event is serialized
+   * and added to a bounded blocking queue, before returning to the "synchronous" caller.
+   * @param event the log event to append
+   * @throws InterruptedException
+   */
+  private void handleEvent(LogEvent event) throws InterruptedException {
+    if (usingAsyncLogger) {
+      sendEventToSystemProducer(encodeLogEventToBytes(event));
+      return;
+    }
+
+    // Serialize the event before adding to the queue to leverage the caller thread
+    // and ensure that the transferThread can keep up.
+    if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+      // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
+      // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+      // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+      // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+      // Scenario:
+      // T1: holds L1 and is waiting for L2
+      // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+      // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+      // so dropping events in the StreamAppender is our best recourse.
+
+      // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+      int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+      System.err.println(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+          queueTimeoutS,
+          systemStream.toString(),
+          messagesDropped));
+
+      // Emit a metric which can be monitored to ensure it doesn't happen often.
+      metrics.logMessagesDropped.inc(messagesDropped);
+    }
+    metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+  }
+
+  protected byte[] encodeLogEventToBytes(LogEvent event) {
+    return serde.toBytes(subLog(event));
+  }
+
   private Message subAppend(LogEvent event) {
     if (getLayout() == null) {
       return new SimpleMessage(event.getMessage().getFormattedMessage());
@@ -239,7 +284,7 @@
     }
   }
 
-  private LogEvent subLog(LogEvent event) {
+  protected LogEvent subLog(LogEvent event) {
     return Log4jLogEvent.newBuilder()
         .setLevel(event.getLevel())
         .setLoggerName(event.getLoggerName())
@@ -256,12 +301,12 @@
 
   @Override
   public void stop() {
-    log.info("Shutting down the StreamAppender...");
+    System.out.println(String.format("Shutting down the %s...", getName()));
     transferThread.interrupt();
     try {
       transferThread.join();
     } catch (InterruptedException e) {
-      log.error("Interrupted while waiting for transfer thread to finish.", e);
+      System.err.println("Interrupted while waiting for transfer thread to finish." + e);
       Thread.currentThread().interrupt();
     }
 
@@ -285,7 +330,7 @@
    *
    * @return Config the config of this container
    */
-  protected Config getConfig() {
+  private Config fetchConfig() {
     Config config;
 
     try {
@@ -305,29 +350,18 @@
     return config;
   }
 
-  protected void setupSystem() {
-    config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
 
-    if (streamName == null) {
-      streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
-    }
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(getName(), metricsRegistry);
+  }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
-
-    String systemName = log4jSystemConfig.getSystemName();
-    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
-        .orElseThrow(() -> new SamzaException(
-            "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"));
-    SystemFactory systemFactory = ReflectionUtil.getObj(systemFactoryName, SystemFactory.class);
-
-    setSerde(log4jSystemConfig, systemName, streamName);
-
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
     if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
       // Explicitly create stream appender stream with the partition count the same as the number of containers.
-      System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount());
+      System.out.println(String.format("[%s] creating stream ", getName()) + streamName + " with partition count " + getPartitionCount());
       StreamSpec streamSpec =
           StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
 
@@ -337,55 +371,81 @@
       systemAdmin.createStream(streamSpec);
       systemAdmin.stop();
     }
+  }
+
+  protected void setupSystem() {
+    config = getConfig();
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
+
+    if (streamName == null) {
+      streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
+    }
+
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+    // Take this.getClass().getName() as the name to make it extend-friendly
+    metrics = getMetrics(metricsRegistry);
+    // Register metrics into metrics reporters so that they are able to be reported to other systems
+    Map<String, MetricsReporter>
+        metricsReporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), containerName);
+    metricsReporters.values().forEach(reporter -> {
+      reporter.register(containerName, metricsRegistry);
+      reporter.start();
+    });
+
+    String systemName = log4jSystemConfig.getSystemName();
+    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
+        .orElseThrow(() -> new SamzaException(
+            "Could not figure out \"" + systemName + "\" system factory for log4j " + getName() + " to use"));
+    SystemFactory systemFactory = ReflectionUtil.getObj(systemFactoryName, SystemFactory.class);
+
+    setSerde(log4jSystemConfig, systemName);
+
+    setupStream(systemFactory, systemName);
 
     systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry, this.getClass().getSimpleName());
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);
     systemProducer.start();
 
-    log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+    System.out.println(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
         + " in " + systemName + ". Logs are partitioned by " + key);
 
     startTransferThread();
   }
 
   private void startTransferThread() {
-
-    try {
-      // Serialize the key once, since we will use it for every event.
-      final byte[] keyBytes = key.getBytes("UTF-8");
-
-      Runnable transferFromQueueToSystem = () -> {
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            byte[] serializedLogEvent = logQueue.take();
-
-            OutgoingMessageEnvelope outgoingMessageEnvelope =
-                new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
-            systemProducer.send(SOURCE, outgoingMessageEnvelope);
-
-          } catch (InterruptedException e) {
-            // Preserve the interrupted status for the loop condition.
-            Thread.currentThread().interrupt();
-          } catch (Throwable t) {
-            log.error("Error sending StreamAppender event to SystemProducer", t);
-          }
+    Runnable transferFromQueueToSystem = () -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          sendEventToSystemProducer(logQueue.take());
+        } catch (InterruptedException e) {
+          // Preserve the interrupted status for the loop condition.
+          Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+          metrics.logMessagesErrors.inc();
+          System.err.println("Error sending " + getName() + " event to SystemProducer " + t);
         }
-      };
+      }
+    };
 
-      transferThread = new Thread(transferFromQueueToSystem);
-      transferThread.setDaemon(true);
-      transferThread.setName("Samza StreamAppender Producer " + transferThread.getName());
-      transferThread.start();
-
-    } catch (UnsupportedEncodingException e) {
-      throw new SamzaException(String.format(
-          "Container name: %s could not be encoded to bytes. StreamAppender cannot proceed.", key),
-          e);
-    }
+    transferThread = new Thread(transferFromQueueToSystem);
+    transferThread.setDaemon(true);
+    transferThread.setName("Samza " + getName() + " Producer " + transferThread.getName());
+    transferThread.start();
   }
 
-  protected static String getStreamName(String jobName, String jobId) {
+  /**
+   * Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
+   * @param serializedLogEvent
+   */
+  private void sendEventToSystemProducer(byte[] serializedLogEvent) {
+    metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
+    metrics.logMessagesCountSent.inc();
+    systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent));
+  }
+
+  protected String getStreamName(String jobName, String jobId) {
     if (jobName == null) {
       throw new SamzaException("job name is null. Please specify job.name");
     }
@@ -402,9 +462,8 @@
    *
    * @param log4jSystemConfig log4jSystemConfig for this appender
    * @param systemName name of the system
-   * @param streamName name of the stream
    */
-  private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
+  protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName) {
     String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();
     String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
 
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
index 38f613c..466a520 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
@@ -34,10 +34,22 @@
   /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
   public final Counter logMessagesDropped;
 
+  /** The number of log messages cannot be sent out due to errors e.g. serialization errors, system producer send errors. */
+  public final Counter logMessagesErrors;
+
+  /** The size of log messages sent out to SystemProducer. */
+  public final Counter logMessagesBytesSent;
+
+  /** The number of log messages sent out to SystemProducer. */
+  public final Counter logMessagesCountSent;
+
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
-    super(prefix, registry);
+    super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
     recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
+    logMessagesErrors = newCounter("log-messages-errors");
+    logMessagesBytesSent = newCounter("log-messages-bytes-sent");
+    logMessagesCountSent = newCounter("log-messages-count-sent");
   }
 }
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
index 07df10c..ce6f081 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
@@ -41,8 +41,8 @@
 class MockSystemProducerAppender extends StreamAppender {
   private static Config config;
 
-  protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Config config, String streamName) {
-    super(name, filter, layout, ignoreExceptions, streamName);
+  protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, final boolean usingAsyncLogger, Config config, String streamName) {
+    super(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
   }
 
   @PluginFactory
@@ -51,6 +51,7 @@
       @PluginElement("Filter") final Filter filter,
       @PluginElement("Layout") Layout<? extends Serializable> layout,
       @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+      @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
       @PluginElement("Config") final Config testConfig,
       @PluginAttribute("streamName") String streamName) {
     if (testConfig == null) {
@@ -58,7 +59,7 @@
     } else {
       config = testConfig;
     }
-    return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, config, streamName);
+    return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, config, streamName);
   }
 
   @Override
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 0248343..2cfd59b 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -19,21 +19,28 @@
 
 package org.apache.samza.logging.log4j2;
 
-import static org.junit.Assert.*;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.core.Logger;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import org.apache.logging.log4j.core.config.Order;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.RootLoggerComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.layout.PatternLayout;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerde;
@@ -43,6 +50,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestStreamAppender {
 
   static Logger log = (Logger) LogManager.getLogger(TestStreamAppender.class);
@@ -59,7 +68,8 @@
   public void testDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     assertNotNull(systemProducerAppender.getSerde());
     assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
@@ -68,16 +78,15 @@
   @Test
   public void testNonDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
-    String streamName = StreamAppender.getStreamName("log4jTest", "1");
     Map<String, String> map = new HashMap<String, String>();
     map.put("job.name", "log4jTest");
     map.put("job.id", "1");
     map.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
     map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-    map.put("systems.mock.streams." + streamName + ".samza.msg.serde", "log4j-string");
+    map.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", "log4j-string");
     map.put("task.log4j.system", "mock");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, new MapConfig(map), null);
+    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, new MapConfig(map), null);
     systemProducerAppender.start();
     assertNotNull(systemProducerAppender.getSerde());
     assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
@@ -87,7 +96,7 @@
   public void testDefaultStreamName() {
     System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     Assert.assertEquals("__samza_log4jTest_1_logs", systemProducerAppender.getStreamName());
@@ -97,7 +106,7 @@
   public void testCustomStreamName() {
     System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, "test-stream-name");
+    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, "test-stream-name");
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     Assert.assertEquals("test-stream-name", systemProducerAppender.getStreamName());
@@ -108,7 +117,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
 
     log.addAppender(systemProducerAppender);
@@ -119,11 +129,58 @@
   }
 
   @Test
+  public void testSystemProducerAppenderInContainerWithAsyncLogger() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
+    // Enabling async logger on log4j2 programmatically
+    ConfigurationFactory.setConfigurationFactory(new AsyncLoggerConfigurationFactory());
+
+    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, true, null, null);
+    systemProducerAppender.start();
+    log.addAppender(systemProducerAppender);
+    log.setLevel(Level.INFO);
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    logAndVerifyMessages(messages);
+    systemProducerAppender.stop();
+  }
+
+
+  @Plugin(name = "AsyncLoggerConfigurationFactory", category = ConfigurationFactory.CATEGORY)
+  @Order(50)
+  public static class AsyncLoggerConfigurationFactory extends ConfigurationFactory {
+
+    private static Configuration createConfiguration(final String name, ConfigurationBuilder<BuiltConfiguration> builder) {
+      builder.setConfigurationName(name);
+      RootLoggerComponentBuilder rootLoggerBuilder = builder.newAsyncRootLogger(Level.INFO);
+      builder.add(rootLoggerBuilder);
+      return builder.build();
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
+      return getConfiguration(loggerContext, source.toString(), null);
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) {
+      ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder();
+      return createConfiguration(name, builder);
+    }
+
+    @Override
+    protected String[] getSupportedTypes() {
+      return new String[]{"*"};
+    }
+  }
+
+  @Test
   public void testSystemProducerAppenderInAM() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-job-coordinator");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     log.setLevel(Level.INFO);
@@ -143,7 +200,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
 
@@ -162,7 +220,8 @@
         "task.log4j.system", "mock"));
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, mapConfig, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, mapConfig, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
 
@@ -172,7 +231,8 @@
   @Test
   public void testDefaultPartitionCount() {
     System.setProperty("samza.container.name", "samza-container-1");
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
     Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
 
     Map<String, String> map = new HashMap<>();
@@ -181,10 +241,12 @@
     map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
     map.put("task.log4j.system", "mock");
     map.put("job.container.count", "4");
-    systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, new MapConfig(map), null);
+    systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, null, false, false, new MapConfig(map), null);
     Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
 
-    systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, null, null);
+    systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
     systemProducerAppender.setPartitionCount(8);
     Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
   }
@@ -194,7 +256,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     log.setLevel(Level.INFO);
@@ -224,7 +287,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.queueTimeoutS = 1;
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 16a320e..79f58e8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -21,7 +21,6 @@
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -269,26 +268,27 @@
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
       MessageStream<SamzaSqlRelMessage> inputStream) {
     return inputStream.flatMap(message -> {
-      Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
-      if (field != null && field instanceof List) {
-        List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
+      Object targetFlattenColumn = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
+      final List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
+      if (targetFlattenColumn != null && targetFlattenColumn instanceof List) {
+        List<Object> objectList = (List<Object>) targetFlattenColumn;
         SamzaSqlRelMsgMetadata messageMetadata = message.getSamzaSqlRelMsgMetadata();
         SamzaSqlRelMsgMetadata newMetadata =
             new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(), messageMetadata.getArrivalTime(),
                 messageMetadata.getScanTimeNanos(), messageMetadata.getScanTimeMillis());
-        for (Object fieldValue : (List) field) {
+        for (Object fieldValue : objectList) {
           List<Object> newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
-          newValues.set(flattenIndex, Collections.singletonList(fieldValue));
+          newValues.set(flattenIndex, fieldValue);
           outMessages.add(
               new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues, newMetadata));
           newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), newMetadata.getArrivalTime(),
               newMetadata.getScanTimeNanos(), newMetadata.getScanTimeMillis());
         }
-        return outMessages;
       } else {
         message.getSamzaSqlRelMsgMetadata().isNewInputMessage = true;
-        return Collections.singletonList(message);
+        outMessages.add(message);
       }
+      return outMessages;
     });
   }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index ca78af2..4a515c0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -27,11 +27,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.planner.SamzaSqlValidator;
@@ -422,15 +422,14 @@
   }
 
   @Test
-  public void testEndToEndFlatten() throws Exception {
+  public void testEndToEndFlatten() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
-    LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
-        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) "
-            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 "
+        "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0, array_values) "
+            + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0, array_values"
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -442,12 +441,29 @@
 
     List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
 
-    int expectedMessages = 0;
-    // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
+    // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
+    int expectedMessages = (numMessages * (numMessages - 1)) / 2;
+    //Assert.assertEquals(outMessages.size(), actualList.size());
     Assert.assertEquals(expectedMessages, outMessages.size());
+
+    // check that values are actually not null and within the expected range
+    Optional<GenericRecord> nullValueRecord = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> x.get("string_value") == null)
+        .findFirst();
+    // The String value column is result of dot product thus must be present in the Array column
+    Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
+      String value = (String) x.get("string_value");
+      List<Object> arrayValues = (List<Object>) x.get("array_values");
+      if (arrayValues == null) {
+        return true;
+      }
+      Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
+      return !notThere.isPresent();
+    }).findFirst();
+
+    Assert.assertFalse("Null value " +  nullValueRecord.orElse(null), nullValueRecord.isPresent());
+    Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
   }
 
 
@@ -475,7 +491,7 @@
   }
 
   @Test
-  public void testEndToEndWithFloatToStringConversion() throws Exception {
+  public void testEndToEndWithFloatToStringConversion() {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -609,16 +625,34 @@
 
     List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
 
-    int expectedMessages = 0;
+    // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
+    int expectedMessages = (numMessages * (numMessages - 1)) / 2;
     // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
     Assert.assertEquals(expectedMessages, outMessages.size());
+
+    // check that values are actually not null and within the expected range
+    Optional<GenericRecord> nullValueRecord = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> x.get("id") == null)
+        .findFirst();
+    Assert.assertFalse("Null value " +  nullValueRecord.orElse(null), nullValueRecord.isPresent());
+    //TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
+   /* // The String value column is result of dot product thus must be present in the Array column
+    Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
+      String value = (String) x.get("string_value");
+      List<Object> arrayValues = (List<Object>) x.get("array_values");
+      if (arrayValues == null) {
+        return true;
+      }
+      Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
+      return !notThere.isPresent();
+    }).findFirst();
+    Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
+    */
   }
 
   @Test
-  public void testEndToEndSubQuery() throws Exception {
+  public void testEndToEndSubQuery() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -635,12 +669,32 @@
 
     List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);
 
-    int expectedMessages = 0;
+    // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
+    int expectedMessages = (numMessages * (numMessages - 1)) / 2;
     // Flatten de-normalizes the data. So there is separate record for each entry in the array.
-    for (int index = 1; index < numMessages; index++) {
-      expectedMessages = expectedMessages + Math.max(1, index);
-    }
     Assert.assertEquals(expectedMessages, outMessages.size());
+
+    // check that values are actually not null and within the expected range
+    Optional<GenericRecord> nullValueRecord = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> x.get("id") == null)
+        .findFirst();
+    Assert.assertFalse("Null value " +  nullValueRecord.orElse(null), nullValueRecord.isPresent());
+
+    //TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
+   /* // The String value column is result of dot product thus must be present in the Array column
+    Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
+      String value = (String) x.get("string_value");
+      List<Object> arrayValues = (List<Object>) x.get("array_values");
+      if (arrayValues == null) {
+        return true;
+      }
+      Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
+      return !notThere.isPresent();
+    }).findFirst();
+    Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
+    */
+
   }
 
   @Test