Merge pull request #1513 from dxichen/merge-state-backend-async-commit

Merge changes from master onto feature branch.
diff --git a/build.gradle b/build.gradle
index a3512e0..2b98b8a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -127,11 +127,6 @@
     toolVersion = "$checkstyleVersion"
   }
 
-  tasks.withType(ScalaCompile) {
-    // show compile errors in console output
-    logging.setLevel LogLevel.WARN
-  }
-
   tasks.withType(Test) {
     test {
       testLogging {
@@ -196,7 +191,6 @@
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "net.jodah:failsafe:$failsafeVersion"
-    compile "com.linkedin.cytodynamics:cytodynamics-nucleus:$cytodynamicsVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
@@ -221,6 +215,7 @@
 
   dependencies {
     compile "com.azure:azure-storage-blob:12.0.1"
+    compile "com.azure:azure-identity:1.0.0"
     compile "com.microsoft.azure:azure-storage:5.3.1"
     compile "com.microsoft.azure:azure-eventhubs:1.0.1"
     compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index b588e64..18a782f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1912,6 +1912,22 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="stores-rocksdb-max-open-files">stores.<span class="store">store-name</span>.<br>rocksdb.max.open.files</td>
+                    <td class="default">-1</td>
+                    <td class="description">
+                        Limits the number of open files that RocksDB can have open at one time.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="stores-rocksdb-max-file-opening-threads">stores.<span class="store">store-name</span>.<br>rocksdb.max.file.opening.threads</td>
+                    <td class="default">16</td>
+                    <td class="description">
+                        Sets the number of threads used to open RocksDB files.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
                     <td class="default"></td>
                     <td class="description">
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 596dc52..1ef74cd 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -258,7 +258,7 @@
 #### <a name="advanced-azure-blob-storage"></a>[Advanced Azure Blob Storage Configurations](#advanced-azure-blob-storage)
 |Name|Default|Description|
 |--- |--- |--- |
-|systems.**_system-name_**.azureblob.proxy.use |false|if true, proxy will be used to connect to Azure.|
+|systems.**_system-name_**.azureblob.proxy.use |false|if true, proxy will be used to connect to Azure for blob creation.|
 |systems.**_system-name_**.azureblob.proxy.hostname| |if proxy.use is true then host name of proxy.|
 |systems.**_system-name_**.azureblob.proxy.port| |if proxy.use is true then port of proxy.|
 |systems.**_system-name_**.azureblob.writer.factory.class|`org.apache.samza.system.`<br>`azureblob.avro.`<br>`AzureBlobAvroWriterFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.producer.AzureBlobWriter` impl for the system producer.<br><br>The default writer creates blobs that are of type AVRO and require the messages sent to a blob to be AVRO records. The blobs created by the default writer are of type [Block Blobs](https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs).|
@@ -272,7 +272,14 @@
 |systems.**_system-name_**.azureblob.closeTimeoutMs|300000 (5 mins)|timeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob.|
 |systems.**_system-name_**.azureblob.suffixRandomStringToBlobName|true|if true, a random string of 8 chars is suffixed to the blob name to prevent name collision when more than one Samza tasks are writing to the same SSP.|
 |systems.**_system-name_**.azureblob.metadataPropertiesGeneratorFactory|`org.apache.samza.system.`<br>`azureblob.utils.`<br>`NullBlobMetadataGeneratorFactory`|Fully qualified class name of the `org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory` impl for the system producer. <br><br>The default metadata generator does not add any metadata to the blob.| 
-|systems.**_system-name_**.azureblob.metadataGeneratorConfig| |Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.<br>For example, to pass a "key":"value" pair to the metadata generator, add config like systems.<system-name>.azureblob.metadataGeneratorConfig.\<key\> with value \<value\>| 
+|systems.**_system-name_**.azureblob.metadataGeneratorConfig| |Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.<br>For example, to pass a "key":"value" pair to the metadata generator, add config like systems.<system-name>.azureblob.metadataGeneratorConfig.\<key\> with value \<value\>|
+|systems.**_system-name_**.azureblob.useTokenCredentialAuthentication|false| if true, then com.azure.core.credential.TokenCredential is used to authenticate with Azure.|
+|systems.**_system-name_**.azureblob.client.id| | if TokenCredential authentication, then the client id to be used for authentication.|
+|systems.**_system-name_**.azureblob.client.secret| | if TokenCredential authentication, then the client secret to be used for authentication.|
+|systems.**_system-name_**.azureblob.tenant.id| | if TokenCredential authentication, then the tenant id to be used for authentication.|
+|systems.**_system-name_**.azureblob.authProxy.use|false| if TokenCredential authentication, then setting this to true will result in using a proxy for authentication.|
+|systems.**_system-name_**.azureblob.authProxy.hostName| |if authProxy.use is true then host name of proxy.|
+|systems.**_system-name_**.azureblob.authProxy.port| |if authProxy.use is true then port of proxy.|
 
 
 ### <a name="state-storage"></a>[4. State Storage](#state-storage)
diff --git a/gradle.properties b/gradle.properties
index d7c4cd7..1a6ee97 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -25,3 +25,5 @@
 
 systemProp.file.encoding=utf-8
 checkstyleVersion=6.11.2
+
+org.gradle.parallel=true
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index ebd2d38..019cde3 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,7 +25,6 @@
   commonsHttpClientVersion = "3.1"
   commonsIoVersion = "2.8.0"
   commonsLang3Version = "3.11"
-  cytodynamicsVersion = "0.2.0"
   elasticsearchVersion = "2.2.0"
   gsonVersion = "2.8.6"
   guavaVersion = "30.1-jre"
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
new file mode 100644
index 0000000..b036dc6
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobClientBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.azureblob;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
+import com.azure.core.http.policy.HttpLogDetailLevel;
+import com.azure.core.http.policy.HttpLogOptions;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.storage.blob.BlobServiceAsyncClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import java.net.InetSocketAddress;
+import com.azure.core.http.HttpClient;
+import java.util.Locale;
+
+
+/**
+ * This class provides a method to create {@link BlobServiceAsyncClient} to be used by the
+ * {@link org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer}. It created the client based on the
+ * configs given to the SystemProducer - such as which authentication method to use, whether to use proxy to authenticate,
+ * and so on.
+ */
+public final class AzureBlobClientBuilder {
+  private final String systemName;
+  private final String azureUrlFormat;
+  private final AzureBlobConfig azureBlobConfig;
+  public AzureBlobClientBuilder(String systemName, String azureUrlFormat, AzureBlobConfig azureBlobConfig) {
+    this.systemName = systemName;
+    this.azureUrlFormat = azureUrlFormat;
+    this.azureBlobConfig = azureBlobConfig;
+  }
+
+  /**
+   * method creates BlobServiceAsyncClient using the configs provided earlier.
+   * if the authentication method is set to {@link TokenCredential} then a {@link com.azure.identity.ClientSecretCredential}
+   * is created and used for the Blob client. Else authentication is done via account name and key using the
+   * {@link StorageSharedKeyCredential}.
+   * The config used to determine which authentication is systems.%s.azureblob.useTokenCredentialAuthentication = true
+   * for using TokenCredential.
+   * @return BlobServiceAsyncClient
+   */
+  public BlobServiceAsyncClient getBlobServiceAsyncClient() {
+    BlobServiceClientBuilder blobServiceClientBuilder = getBlobServiceClientBuilder();
+
+    if (azureBlobConfig.getUseTokenCredentialAuthentication(systemName)) {
+      // Use your Azure Blob Storage account's name and client details to create a token credential object to access your account.
+      TokenCredential tokenCredential = getTokenCredential();
+      return blobServiceClientBuilder.credential(tokenCredential).buildAsyncClient();
+    }
+
+    // Use your Azure Blob Storage account's name and key to create a credential object to access your account.
+    StorageSharedKeyCredential storageSharedKeyCredential = getStorageSharedKeyCredential();
+    return blobServiceClientBuilder.credential(storageSharedKeyCredential).buildAsyncClient();
+  }
+
+  /**
+   * Method to get the builder {@link BlobServiceClientBuilder} for creating BlobServiceAsyncClient.
+   * This builder is not provided the credential for authentication here.
+   * this builder is given an endpoint for the Azure Storage account and a http client to be passed on to the
+   * BlobServiceAsyncClient for blob creation.
+   * @return BlobServiceClientBuilder
+   */
+  private BlobServiceClientBuilder getBlobServiceClientBuilder() {
+    // From the Azure portal, get your Storage account blob service AsyncClient endpoint.
+    String endpoint = String.format(Locale.ROOT, azureUrlFormat, azureBlobConfig.getAzureAccountName(systemName));
+
+    HttpLogOptions httpLogOptions = new HttpLogOptions();
+    httpLogOptions.setLogLevel(HttpLogDetailLevel.BASIC);
+
+    BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder()
+        .httpLogOptions(httpLogOptions)
+        .endpoint(endpoint)
+        .httpClient(getHttpClient());
+
+    return blobServiceClientBuilder;
+  }
+
+  /**
+   * Method to create the {@link com.azure.identity.ClientSecretCredential} to be used for authenticating with
+   * Azure Storage. If the config systems.%s.azureblob.authProxy.use is set to true, then authentication happens
+   * via the proxy specified by systems.%s.azureblob.authProxy.hostname and systems.%s.azureblob.authProxy.port configs.
+   * @return ClientSecretCredential which extends from {@link TokenCredential}
+   */
+  private TokenCredential getTokenCredential() {
+    ClientSecretCredentialBuilder clientSecretCredentialBuilder = new ClientSecretCredentialBuilder()
+        .clientId(azureBlobConfig.getAzureClientId(systemName))
+        .clientSecret(azureBlobConfig.getAzureClientSecret(systemName))
+        .tenantId(azureBlobConfig.getAzureTenantId(systemName));
+
+    if (azureBlobConfig.getUseAuthProxy(systemName)) {
+      return clientSecretCredentialBuilder
+          .proxyOptions(new ProxyOptions(ProxyOptions.Type.HTTP,
+              new InetSocketAddress(azureBlobConfig.getAuthProxyHostName(systemName),
+                  azureBlobConfig.getAuthProxyPort(systemName))))
+          .build();
+    }
+    return clientSecretCredentialBuilder
+        .build();
+  }
+
+  /**
+   * Method to create {@link StorageSharedKeyCredential} to used for authenticating with Azure Storage.
+   * @return StorageSharedKeyCredential
+   */
+  private StorageSharedKeyCredential getStorageSharedKeyCredential() {
+    return new StorageSharedKeyCredential(azureBlobConfig.getAzureAccountName(systemName),
+        azureBlobConfig.getAzureAccountKey(systemName));
+  }
+
+  /**
+   * Method to create {@link HttpClient} to be used by the {@link BlobServiceAsyncClient} while creating blobs
+   * in the Azure Storage. If the config systems.%s.azureblob.proxy.use is set to true then the http client
+   * uses the proxy provided by systems.%s.azureblob.proxy.hostname and systems.%s.azureblob.proxy.port configs.
+   * @return HttpClient
+   */
+  private HttpClient getHttpClient() {
+    HttpClient httpClient;
+    if (azureBlobConfig.getUseBlobProxy(systemName)) {
+      httpClient = new NettyAsyncHttpClientBuilder()
+          .proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
+              new InetSocketAddress(azureBlobConfig.getAzureBlobProxyHostname(systemName),
+                  azureBlobConfig.getAzureBlobProxyPort(systemName))))
+          .build();
+    } else {
+      httpClient = HttpClient.createDefault();
+    }
+    return httpClient;
+  }
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
index 58f206e..3026a89 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobConfig.java
@@ -33,11 +33,27 @@
   public static final String AZURE_BLOB_LOG_SLOW_REQUESTS_MS = "samza.azureblob.log.slowRequestMs";
   private static final long AZURE_BLOB_LOG_SLOW_REQUESTS_MS_DEFAULT = Duration.ofSeconds(30).toMillis();
 
+  // Azure authentication - via a/c name+key or ClientSecretCredential
   // system Level Properties.
   // fully qualified class name of the AzureBlobWriter impl for the producer system
   public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME = SYSTEM_AZUREBLOB_PREFIX + "writer.factory.class";
   public static final String SYSTEM_WRITER_FACTORY_CLASS_NAME_DEFAULT = "org.apache.samza.system.azureblob.avro.AzureBlobAvroWriterFactory";
 
+  public static final String SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "useTokenCredentialAuthentication";
+  private static final boolean SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT = false;
+
+  // ClientSecretCredential needs client id, client secret, tenant id, vault name, service principal
+  public static final String SYSTEM_AZURE_CLIENT_ID = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "client.id";
+  public static final String SYSTEM_AZURE_CLIENT_SECRET = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "client.secret";
+  public static final String SYSTEM_AZURE_TENANT_ID = Config.SENSITIVE_PREFIX + SYSTEM_AZUREBLOB_PREFIX + "tenant.id";
+  // Whether to use proxy while authenticating with Azure
+  public static final String SYSTEM_AZURE_USE_AUTH_PROXY  = SYSTEM_AZUREBLOB_PREFIX + "authProxy.use";
+  public static final boolean SYSTEM_AZURE_USE_AUTH_PROXY_DEFAULT = false;
+
+  // name of the host to be used as auth proxy
+  public static final String SYSTEM_AZURE_AUTH_PROXY_HOSTNAME = SYSTEM_AZUREBLOB_PREFIX + "authProxy.hostname";
+  // port in the auth proxy host to be used
+  public static final String SYSTEM_AZURE_AUTH_PROXY_PORT = SYSTEM_AZUREBLOB_PREFIX + "authProxy.port";
   // Azure Storage Account name under which the Azure container representing this system is.
   // System name = Azure container name
   // (https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#container-names)
@@ -101,7 +117,6 @@
   public static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY = SYSTEM_AZUREBLOB_PREFIX + "metadataPropertiesGeneratorFactory";
   private static final String SYSTEM_BLOB_METADATA_PROPERTIES_GENERATOR_FACTORY_DEFAULT =
       "org.apache.samza.system.azureblob.utils.NullBlobMetadataGeneratorFactory";
-
   // Additional configs for the metadata generator should be prefixed with this string which is passed to the generator.
   // for example, to pass a "key":"value" pair to the metadata generator, add config like
   // systems.<system-name>.azureblob.metadataGeneratorConfig.<key> with value <value>
@@ -127,11 +142,11 @@
     return accountName;
   }
 
-  public boolean getUseProxy(String systemName) {
+  public boolean getUseBlobProxy(String systemName) {
     return getBoolean(String.format(SYSTEM_AZURE_USE_PROXY, systemName), SYSTEM_AZURE_USE_PROXY_DEFAULT);
   }
 
-  public String getAzureProxyHostname(String systemName) {
+  public String getAzureBlobProxyHostname(String systemName) {
     String hostname = get(String.format(SYSTEM_AZURE_PROXY_HOSTNAME, systemName));
     if (hostname == null) {
       throw new ConfigException("Azure proxy host name is required.");
@@ -139,7 +154,7 @@
     return hostname;
   }
 
-  public int getAzureProxyPort(String systemName) {
+  public int getAzureBlobProxyPort(String systemName) {
     return getInt(String.format(SYSTEM_AZURE_PROXY_PORT, systemName));
   }
 
@@ -207,4 +222,50 @@
   public Config getSystemBlobMetadataGeneratorConfigs(String systemName) {
     return subset(String.format(SYSTEM_BLOB_METADATA_GENERATOR_CONFIG_PREFIX, systemName));
   }
+
+  public boolean getUseTokenCredentialAuthentication(String systemName) {
+    return getBoolean(String.format(SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION, systemName),
+        SYSTEM_USE_TOKEN_CREDENTIAL_AUTHENTICATION_DEFAULT);
+  }
+
+  public boolean getUseAuthProxy(String systemName) {
+    return getBoolean(String.format(SYSTEM_AZURE_USE_AUTH_PROXY, systemName), SYSTEM_AZURE_USE_AUTH_PROXY_DEFAULT);
+  }
+
+  public String getAuthProxyHostName(String systemName) {
+    String hostname = get(String.format(SYSTEM_AZURE_AUTH_PROXY_HOSTNAME, systemName));
+    if (hostname == null) {
+      throw new ConfigException("Azure proxy host name is required.");
+    }
+    return hostname;
+  }
+
+  public int getAuthProxyPort(String systemName) {
+    return getInt(String.format(SYSTEM_AZURE_AUTH_PROXY_PORT, systemName));
+  }
+
+  public String getAzureClientId(String systemName) {
+    String clientId = get(String.format(SYSTEM_AZURE_CLIENT_ID, systemName));
+    if (clientId == null) {
+      throw new ConfigException("Azure Client id is required.");
+    }
+    return clientId;
+  }
+
+  public String getAzureClientSecret(String systemName) {
+    String clientSecret = get(String.format(SYSTEM_AZURE_CLIENT_SECRET, systemName));
+    if (clientSecret == null) {
+      throw new ConfigException("Azure Client secret is required.");
+    }
+    return clientSecret;
+  }
+
+
+  public String getAzureTenantId(String systemName) {
+    String tenantId = get(String.format(SYSTEM_AZURE_TENANT_ID, systemName));
+    if (tenantId == null) {
+      throw new ConfigException("Azure tenant id is required.");
+    }
+    return tenantId;
+  }
 }
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
index e615808..222b99c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java
@@ -273,7 +273,7 @@
 
   // SAMZA-2476 stubbing BlockBlobAsyncClient.stageBlock was causing flaky tests.
   @VisibleForTesting
-  void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) {
+  void stageBlock(String blockIdEncoded, ByteBuffer outputStream, int blockSize) throws InterruptedException {
     blobAsyncClient.stageBlock(blockIdEncoded, Flux.just(outputStream), blockSize).block();
   }
 
@@ -335,6 +335,11 @@
             // StageBlock generates exception on Failure.
             stageBlock(blockIdEncoded, outputStream, blockSize);
             break;
+          } catch (InterruptedException e) {
+            String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.",
+                blobAsyncClient.getBlobUrl().toString(), blockId);
+            LOG.error(msg, e);
+            throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e);
           } catch (Exception e) {
             attemptCount += 1;
             String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString()
diff --git a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
index d89f38f..781dce4 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.java
@@ -19,27 +19,16 @@
 
 package org.apache.samza.system.azureblob.producer;
 
-import com.azure.core.http.HttpClient;
 import com.azure.core.http.HttpResponse;
-import com.azure.core.http.ProxyOptions;
-import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
-import com.azure.core.http.policy.HttpLogDetailLevel;
-import com.azure.core.http.policy.HttpLogOptions;
 import com.azure.core.util.Configuration;
 import com.azure.storage.blob.BlobContainerAsyncClient;
 import com.azure.storage.blob.BlobServiceAsyncClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.models.BlobErrorCode;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.azure.storage.blob.models.SkuName;
-import com.azure.storage.common.StorageSharedKeyCredential;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.samza.system.azureblob.AzureBlobConfig;
-import org.apache.samza.system.azureblob.compression.CompressionFactory;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -55,6 +44,9 @@
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemProducerException;
+import org.apache.samza.system.azureblob.AzureBlobClientBuilder;
+import org.apache.samza.system.azureblob.AzureBlobConfig;
+import org.apache.samza.system.azureblob.compression.CompressionFactory;
 import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -196,11 +188,7 @@
       LOG.warn("Attempting to start an already started producer.");
       return;
     }
-
-    String accountName = config.getAzureAccountName(systemName);
-    String accountKey = config.getAzureAccountKey(systemName);
-
-    setupAzureContainer(accountName, accountKey);
+    setupAzureContainer();
 
     LOG.info("Starting producer.");
     isStarted = true;
@@ -354,47 +342,11 @@
   }
 
   @VisibleForTesting
-  void setupAzureContainer(String accountName, String accountKey) {
+  void setupAzureContainer() {
     try {
-      // Use your Azure Blob Storage account's name and key to create a credential object to access your account.
-      StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
-
-      HttpClient httpClient;
-      if (config.getUseProxy(systemName)) {
-        LOG.info("HTTP Proxy setup for AzureBlob pipeline");
-        httpClient = new NettyAsyncHttpClientBuilder()
-            .proxy(new ProxyOptions(ProxyOptions.Type.HTTP,
-            new InetSocketAddress(config.getAzureProxyHostname(systemName), config.getAzureProxyPort(systemName)))).build();
-      } else {
-        httpClient = HttpClient.createDefault();
-      }
-
-      // From the Azure portal, get your Storage account blob service AsyncClient endpoint.
-      String endpoint = String.format(Locale.ROOT, AZURE_URL, accountName);
-
-      HttpLogOptions httpLogOptions = new HttpLogOptions();
-      httpLogOptions.setLogLevel(HttpLogDetailLevel.BASIC);
-      BlobServiceAsyncClient storageClient =
-          new BlobServiceClientBuilder()
-          .httpLogOptions(httpLogOptions)
-          .endpoint(endpoint)
-          .credential(credential)
-          .httpClient(httpClient)
-          .buildAsyncClient();
-
-
-      SkuName accountType = storageClient.getAccountInfo().block().getSkuName();
-      long flushThresholdSize = config.getMaxFlushThresholdSize(systemName);
-      boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
-      if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { // 100 MB
-        throw new SystemProducerException("Azure storage account with name: " + accountName
-            + " is a premium account and can only handle upto " +  PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
-            + flushThresholdSize);
-      } else if (!isPremiumAccount && flushThresholdSize > STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
-        throw new SystemProducerException("Azure storage account with name: " + accountName
-            + " is a standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
-            + flushThresholdSize);
-      }
+      BlobServiceAsyncClient storageClient  = new AzureBlobClientBuilder(systemName, AZURE_URL, config)
+          .getBlobServiceAsyncClient();
+      validateFlushThresholdSizeSupported(storageClient);
 
       containerAsyncClient = storageClient.getBlobContainerAsyncClient(systemName);
 
@@ -406,6 +358,22 @@
     }
   }
 
+  void validateFlushThresholdSizeSupported(BlobServiceAsyncClient storageClient) {
+    SkuName accountType = storageClient.getAccountInfo().block().getSkuName();
+    String accountName = storageClient.getAccountName();
+    long flushThresholdSize = config.getMaxFlushThresholdSize(systemName);
+    boolean isPremiumAccount = SkuName.PREMIUM_LRS == accountType;
+    if (isPremiumAccount && flushThresholdSize > PREMIUM_MAX_BLOCK_SIZE) { // 100 MB
+      throw new SystemProducerException("Azure storage account with name: " + accountName
+          + " is a premium account and can only handle upto " +  PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
+          + flushThresholdSize);
+    } else if (!isPremiumAccount && flushThresholdSize > STANDARD_MAX_BLOCK_SIZE) { // STANDARD account
+      throw new SystemProducerException(
+          "Azure storage account with name: " + accountName + " is a standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is "
+              + flushThresholdSize);
+    }
+  }
+
   /**
    * // find the writer in the writerMap else create one
    * @param source for which to find/create the writer
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
index b713ec7..4412edf 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/avro/TestAzureBlobOutputStream.java
@@ -127,7 +127,7 @@
   }
 
   @Test
-  public void testWrite() {
+  public void testWrite() throws  InterruptedException {
     byte[] b = new byte[THRESHOLD - 10];
     azureBlobOutputStream.write(b, 0, THRESHOLD - 10);
     verify(azureBlobOutputStream, never()).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
@@ -136,7 +136,7 @@
   }
 
   @Test
-  public void testWriteLargerThanThreshold() {
+  public void testWriteLargerThanThreshold() throws  InterruptedException {
     byte[] largeRecord = RANDOM_STRING.substring(0, 2 * THRESHOLD).getBytes();
     byte[] largeRecordFirstHalf = RANDOM_STRING.substring(0, THRESHOLD).getBytes();
     byte[] largeRecordSecondHalf = RANDOM_STRING.substring(THRESHOLD, 2 * THRESHOLD).getBytes();
@@ -165,7 +165,7 @@
   }
 
   @Test
-  public void testWriteLargeRecordWithSmallRecordInBuffer() {
+  public void testWriteLargeRecordWithSmallRecordInBuffer() throws InterruptedException {
     byte[] halfBlock = new byte[THRESHOLD / 2];
     byte[] fullBlock = new byte[THRESHOLD];
     byte[] largeRecord = new byte[2 * THRESHOLD];
@@ -229,6 +229,36 @@
     azureBlobOutputStream.close();
   }
 
+  @Test(expected = AzureException.class)
+  public void testWriteFailedInterruptedException() throws InterruptedException {
+
+    doThrow(new InterruptedException("Lets interrupt the thread"))
+        .when(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+    byte[] b = new byte[100];
+    doReturn(COMPRESSED_BYTES).when(mockCompression).compress(b);
+
+    try {
+      azureBlobOutputStream.write(b, 0, THRESHOLD); // threshold crossed so stageBlock is scheduled.
+      // azureBlobOutputStream.close waits on the CompletableFuture which does the actual stageBlock in uploadBlockAsync
+      azureBlobOutputStream.close();
+    } catch (AzureException exception) {
+      // get root cause of the exception - to confirm its an InterruptedException
+      Throwable dupException = exception;
+      while (dupException.getCause() != null && dupException.getCause() != dupException) {
+        dupException = dupException.getCause();
+      }
+
+      Assert.assertTrue(dupException.getClass().getName().equals(InterruptedException.class.getCanonicalName()));
+      Assert.assertEquals("Lets interrupt the thread", dupException.getMessage());
+
+      // verify stageBlock was called exactly once - aka no retries happen when interrupted exception is thrown
+      verify(azureBlobOutputStream).stageBlock(anyString(), any(ByteBuffer.class), anyInt());
+
+      // rethrow the exception so that the test will fail if no exception was thrown in the try block
+      throw exception;
+    }
+  }
+
   @Test
   public void testClose() {
     azureBlobOutputStream.write(BYTES, 0, THRESHOLD);
@@ -278,7 +308,7 @@
   }
 
   @Test(expected = AzureException.class)
-  public void testCloseFailed() {
+  public void testCloseFailed() throws InterruptedException {
 
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
@@ -320,7 +350,7 @@
   }
 
   @Test (expected = AzureException.class)
-  public void testFlushFailed() throws IOException {
+  public void testFlushFailed() throws IOException, InterruptedException {
     azureBlobOutputStream = spy(new AzureBlobOutputStream(mockBlobAsyncClient, threadPool, mockMetrics,
         blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, FAKE_STREAM,
         60000, THRESHOLD, mockByteArrayOutputStream, mockCompression));
diff --git a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
index acf4cfb..a364cf6 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/azureblob/producer/TestAzureBlobSystemProducer.java
@@ -98,14 +98,14 @@
     // use mock writer impl
     setupWriterForProducer(systemProducer, mockAzureWriter, STREAM);
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
   }
 
   @Test
   public void testStart() {
 
     systemProducer.start();
-    verify(systemProducer).setupAzureContainer(ACCOUNT_NAME, ACCOUNT_KEY);
+    verify(systemProducer).setupAzureContainer();
   }
 
   public void testMultipleStart() {
@@ -264,7 +264,7 @@
         mockMetricsRegistry));
     PowerMockito.whenNew(AzureBlobAvroWriter.class).withAnyArguments().thenThrow(new SystemProducerException("Failed"));
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
 
     systemProducer.register(SOURCE);
     systemProducer.start();
@@ -315,7 +315,7 @@
 
     AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
 
     doReturn(mockAzureWriter1).when(systemProducer).getOrCreateWriter(source1, ome1);
     doReturn(mockAzureWriter2).when(systemProducer).getOrCreateWriter(source2, ome2);
@@ -359,7 +359,7 @@
 
     AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
 
     doReturn(mockAzureWriter1).when(systemProducer).getOrCreateWriter(source1, ome1);
     doReturn(mockAzureWriter2).when(systemProducer).getOrCreateWriter(source2, ome2);
@@ -411,7 +411,7 @@
 
     AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
 
     systemProducer.register(source1);
     systemProducer.start();
@@ -450,7 +450,7 @@
 
     AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
 
     setupWriterForProducer(systemProducer, mockAzureWriter1, stream1);
 
@@ -492,7 +492,7 @@
 
     AzureBlobSystemProducer systemProducer = spy(new AzureBlobSystemProducer(SYSTEM_NAME, azureBlobConfig, mockMetricsRegistry));
     // bypass Azure connection setup
-    doNothing().when(systemProducer).setupAzureContainer(anyString(), anyString());
+    doNothing().when(systemProducer).setupAzureContainer();
 
     setupWriterForProducer(systemProducer, mockAzureWriter1, STREAM);
 
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java b/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
deleted file mode 100644
index a0b5d1e..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/DependencyIsolationUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-public class DependencyIsolationUtils {
-  /**
-   * Directory inside the home directory of the cluster-based job coordinator in which the framework API artifacts are
-   * placed, for usage in dependency isolation for the cluster-based job coordinator.
-   * TODO make this configurable or taken from an environment variable
-   */
-  public static final String FRAMEWORK_API_DIRECTORY = "__samzaFrameworkApi";
-
-  /**
-   * Directory inside the home directory of the cluster-based job coordinator in which the framework infrastructure
-   * artifacts are placed, for usage in dependency isolation for the cluster-based job coordinator.
-   * TODO make this configurable or taken from an environment variable
-   */
-  public static final String FRAMEWORK_INFRASTRUCTURE_DIRECTORY = "__samzaFrameworkInfrastructure";
-
-  /**
-   * Directory inside the home directory of the cluster-based job coordinator in which the application artifacts are
-   * placed, for usage in dependency isolation for the cluster-based job coordinator.
-   * TODO make this configurable or taken from an environment variable
-   */
-  public static final String APPLICATION_DIRECTORY = "__package";
-
-  /**
-   * Name of the file which contains the class names (or globs) which should be loaded from the framework API
-   * classloader.
-   */
-  public static final String FRAMEWORK_API_CLASS_LIST_FILE_NAME = "samza-framework-api-classes.txt";
-
-  public static final String RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME = "runtime-framework-resources-pathing.jar";
-}
diff --git a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java b/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
deleted file mode 100644
index 344a034..0000000
--- a/samza-core/src/main/java/org/apache/samza/classloader/IsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import com.linkedin.cytodynamics.matcher.BootstrapClassPredicate;
-import com.linkedin.cytodynamics.matcher.GlobMatcher;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationship;
-import com.linkedin.cytodynamics.nucleus.DelegateRelationshipBuilder;
-import com.linkedin.cytodynamics.nucleus.IsolationLevel;
-import com.linkedin.cytodynamics.nucleus.LoaderBuilder;
-import com.linkedin.cytodynamics.nucleus.OriginRestriction;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Use this to build a classloader for running Samza which isolates the Samza framework code/dependencies from the
- * application code/dependencies.
- */
-public class IsolatingClassLoaderFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(IsolatingClassLoaderFactory.class);
-
-  private static final String LIB_DIRECTORY = "lib";
-
-  /**
-   * Build a classloader which will isolate Samza framework code from application code. Samza framework classes and
-   * application-specific classes will be loaded using a different classloaders. This will enable dependencies of each
-   * category of classes to also be loaded separately, so that runtime dependency conflicts do not happen.
-   * Each call to this method will build a different instance of a classloader.
-   *
-   * Samza framework API classes need to be specified in a file called
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} which is in the lib directory which is in the
-   * API package. The file needs to be generated when building the framework API package. This class will not generate
-   * the file.
-   *
-   * Implementation notes:
-   *
-   * The cytodynamics isolating classloader is used for this. It provides more control than the built-in
-   * {@link URLClassLoader}. Cytodynamics provides the ability to compose multiple classloaders together and have more
-   * granular delegation strategies between the classloaders.
-   *
-   * In order to share objects between classes loaded by different classloaders, the classes for the shared objects must
-   * be loaded by a common classloader. Those common classes will be loaded through a common API classloader. The
-   * cytodynamics classloader can be set up to only use the common API classloader for an explicit set of classes. The
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file should include the framework API classes.
-   * Also, bootstrap classes (e.g. java.lang.String) need to be loaded by a common classloader, since objects of those
-   * types need to be shared across different framework and application. There are also some static bootstrap classes
-   * which should be shared (e.g. java.lang.System). Bootstrap classes will be loaded through a common classloader by
-   * default.
-   *
-   * These are the classloaders which are used to make up the final classloader.
-   * <ul>
-   *   <li>bootstrap classloader: Built-in Java classes (e.g. java.lang.String)</li>
-   *   <li>API classloader: Common Samza framework API classes</li>
-   *   <li>infrastructure classloader: Core Samza framework classes and plugins that are included in the framework</li>
-   *   <li>
-   *     application classloader: Application code and plugins that are needed in the app but are not included in the
-   *     framework
-   *   </li>
-   * </ul>
-   *
-   * This is the delegation structure for the classloaders:
-   * <pre>
-   *   (bootstrap               (API                  (application
-   *   classloader) &lt;---- classloader) &lt;------- classloader)
-   *                             ^                      ^
-   *                             |                     /
-   *                             |                    /
-   *                             |                   /
-   *                             |                  /
-   *                         (infrastructure classloader)
-   * </pre>
-   * The cytodynamics classloader allows control over when the delegation should happen.
-   * <ol>
-   *   <li>API classloader delegates to the bootstrap classloader if the bootstrap classloader has the class.</li>
-   *   <li>
-   *     Infrastructure classloader only delegates to the API classloader for the common classes specified by
-   *     {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
-   *   </li>
-   *   <li>
-   *     Infrastructure classloader delegates to the application classloader when a class can't be found in the
-   *     infrastructure classloader.
-   *   </li>
-   *   <li>
-   *     Application classloader only delegates to the API classloader for the common classes specified by
-   *     {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME}.
-   *   </li>
-   * </ol>
-   */
-  public ClassLoader buildClassLoader() {
-    // start at the user.dir to find the resources for the classpaths
-    File baseJobDirectory = new File(System.getProperty("user.dir"));
-    File apiLibDirectory = libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_API_DIRECTORY));
-    LOG.info("Using API lib directory: {}", apiLibDirectory);
-    File infrastructureLibDirectory =
-        libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY));
-    LOG.info("Using infrastructure lib directory: {}", infrastructureLibDirectory);
-    File applicationLibDirectory =
-        libDirectory(new File(baseJobDirectory, DependencyIsolationUtils.APPLICATION_DIRECTORY));
-    LOG.info("Using application lib directory: {}", applicationLibDirectory);
-
-    ClassLoader apiClassLoader = buildApiClassLoader(apiLibDirectory);
-    ClassLoader applicationClassLoader =
-        buildApplicationClassLoader(applicationLibDirectory, apiLibDirectory, apiClassLoader);
-
-    // the classloader to return is the one with the infrastructure classpath
-    return buildInfrastructureClassLoader(infrastructureLibDirectory, baseJobDirectory, apiLibDirectory, apiClassLoader,
-        applicationClassLoader);
-  }
-
-  /**
-   * Build the {@link ClassLoader} which can load framework API classes.
-   *
-   * This sets up the link between the bootstrap classloader and the API classloader (see {@link #buildClassLoader()}.
-   */
-  private static ClassLoader buildApiClassLoader(File apiLibDirectory) {
-    /*
-     * This can just use the built-in classloading, which checks the parent classloader first and then checks its own
-     * classpath. A null parent means bootstrap classloader, which contains core Java classes (e.g. java.lang.String).
-     * This doesn't need to be isolated from the parent, because we only want to load all bootstrap classes from the
-     * bootstrap classloader.
-     */
-    return new URLClassLoader(getClasspathAsURLs(apiLibDirectory), null);
-  }
-
-  /**
-   * Build the {@link ClassLoader} which can load application classes.
-   *
-   * This sets up the link between the application classloader and the API classloader (see {@link #buildClassLoader()}.
-   */
-  private static ClassLoader buildApplicationClassLoader(File applicationLibDirectory, File apiLibDirectory,
-      ClassLoader apiClassLoader) {
-    return LoaderBuilder.anIsolatingLoader()
-        // look in application lib directory for JARs
-        .withClasspath(getClasspathAsURIs(applicationLibDirectory))
-        // getClasspathAsURIs should only return JARs within applicationLibDirectory anyways, but doing it to be safe
-        .withOriginRestriction(OriginRestriction.denyByDefault().allowingDirectory(applicationLibDirectory, false))
-        // delegate to the api classloader for API classes
-        .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
-        .build();
-  }
-
-  /**
-   * Build the {@link ClassLoader} which can load Samza framework core classes. If a file with the name
-   * {@link DependencyIsolationUtils#RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME} is found in {@code baseJobDirectory},
-   * then it will be included in the classpath.
-   * This may also fall back to loading application classes.
-   *
-   * This sets up two links: One link between the infrastructure classloader and the API and another link between the
-   * infrastructure classloader and the application classloader (see {@link #buildClassLoader()}.
-   */
-  private static ClassLoader buildInfrastructureClassLoader(File infrastructureLibDirectory,
-      File baseJobDirectory,
-      File apiLibDirectory,
-      ClassLoader apiClassLoader,
-      ClassLoader applicationClassLoader) {
-    // start with JARs in infrastructure lib directory
-    List<URI> classpathURIs = new ArrayList<>(getClasspathAsURIs(infrastructureLibDirectory));
-    OriginRestriction originRestriction = OriginRestriction.denyByDefault()
-        // getClasspathAsURIs should only return JARs within infrastructureLibDirectory anyways, but doing it to be safe
-        .allowingDirectory(infrastructureLibDirectory, false);
-    File runtimeFrameworkResourcesPathingJar =
-        new File(baseJobDirectory, DependencyIsolationUtils.RUNTIME_FRAMEWORK_RESOURCES_PATHING_JAR_NAME);
-    if (canAccess(runtimeFrameworkResourcesPathingJar)) {
-      // if there is a runtime framework resources pathing JAR, then include that in the classpath as well
-      classpathURIs.add(runtimeFrameworkResourcesPathingJar.toURI());
-      originRestriction.allowingGlobPattern(fileURL(runtimeFrameworkResourcesPathingJar).toExternalForm());
-      LOG.info("Added {} to infrastructure classpath", runtimeFrameworkResourcesPathingJar.getPath());
-    } else {
-      LOG.info("Unable to access {}, so not adding to infrastructure classpath",
-          runtimeFrameworkResourcesPathingJar.getPath());
-    }
-    return LoaderBuilder.anIsolatingLoader()
-        .withClasspath(Collections.unmodifiableList(classpathURIs))
-        .withOriginRestriction(originRestriction)
-        .withParentRelationship(buildApiParentRelationship(apiLibDirectory, apiClassLoader))
-        /*
-         * Fall back to the application classloader for certain classes. For example, the application might implement
-         * some pluggable classes (e.g. SystemFactory). Another example is message schemas that are supplied by the
-         * application.
-         */
-        .addFallbackDelegate(DelegateRelationshipBuilder.builder()
-            .withDelegateClassLoader(applicationClassLoader)
-            /*
-             * NONE means that a class will be loaded from here if it is not found in the classpath of the loader that uses
-             * this relationship.
-             */
-            .withIsolationLevel(IsolationLevel.NONE)
-            .build())
-        .build();
-  }
-
-  /**
-   * Build a {@link DelegateRelationship} which defines how to delegate to the API classloader.
-   *
-   * Delegation will only happen for classes specified in
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} and the Java bootstrap classes.
-   */
-  private static DelegateRelationship buildApiParentRelationship(File apiLibDirectory, ClassLoader apiClassLoader) {
-    DelegateRelationshipBuilder apiParentRelationshipBuilder = DelegateRelationshipBuilder.builder()
-        // needs to load API classes from the API classloader
-        .withDelegateClassLoader(apiClassLoader)
-        /*
-         * FULL means to only load classes explicitly specified as "API" from the API classloader. We will use
-         * delegate-preferred class predicates to specify which classes are "API" (see below).
-         */
-        .withIsolationLevel(IsolationLevel.FULL);
-
-    // bootstrap classes need to be loaded from a common classloader
-    apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new BootstrapClassPredicate());
-    // the classes which are Samza framework API classes are added here
-    getFrameworkApiClassGlobs(apiLibDirectory).forEach(
-      apiClassName -> apiParentRelationshipBuilder.addDelegatePreferredClassPredicate(new GlobMatcher(apiClassName)));
-    return apiParentRelationshipBuilder.build();
-  }
-
-  /**
-   * Gets the globs for matching against classes to load from the framework API classloader. This will read the
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} file in {@code directoryWithClassList} to get
-   * the globs.
-   *
-   * @param directoryWithClassList Directory in which
-   * {@link DependencyIsolationUtils#FRAMEWORK_API_CLASS_LIST_FILE_NAME} lives
-   * @return {@link List} of globs for matching against classes to load from the framework API classloader
-   */
-  @VisibleForTesting
-  static List<String> getFrameworkApiClassGlobs(File directoryWithClassList) {
-    File parentPreferredFile =
-        new File(directoryWithClassList, DependencyIsolationUtils.FRAMEWORK_API_CLASS_LIST_FILE_NAME);
-    validateCanAccess(parentPreferredFile);
-    try {
-      return Files.readAllLines(Paths.get(parentPreferredFile.toURI()), StandardCharsets.UTF_8)
-          .stream()
-          .filter(StringUtils::isNotBlank)
-          .collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new SamzaException("Error while reading samza-api class list", e);
-    }
-  }
-
-  /**
-   * Get the {@link URL}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
-   * not recursive.
-   */
-  @VisibleForTesting
-  static URL[] getClasspathAsURLs(File jarsLocation) {
-    validateCanAccess(jarsLocation);
-    File[] filesInJarsLocation = jarsLocation.listFiles();
-    if (filesInJarsLocation == null) {
-      throw new SamzaException(
-          String.format("Could not find any files inside %s, probably because it is not a directory",
-              jarsLocation.getPath()));
-    }
-    URL[] urls = Stream.of(filesInJarsLocation)
-        .filter(file -> file.getName().endsWith(".jar") || file.getName().endsWith(".war"))
-        .map(IsolatingClassLoaderFactory::fileURL)
-        .toArray(URL[]::new);
-    LOG.info("Found {} items to load into classpath from {}", urls.length, jarsLocation);
-    Stream.of(urls).forEach(url -> LOG.debug("Found {} from {}", url, jarsLocation));
-    return urls;
-  }
-
-  /**
-   * Get the {@link URI}s of all JARs/WARs in the directory {@code jarsLocation}. This only looks one level down; it is
-   * not recursive.
-   */
-  @VisibleForTesting
-  static List<URI> getClasspathAsURIs(File jarsLocation) {
-    return Stream.of(getClasspathAsURLs(jarsLocation))
-        .map(IsolatingClassLoaderFactory::urlToURI)
-        .collect(Collectors.toList());
-  }
-
-  private static boolean canAccess(File file) {
-    return file.exists() && file.canRead();
-  }
-
-  /**
-   * Makes sure that a file exists and can be read.
-   */
-  private static void validateCanAccess(File file) {
-    if (!canAccess(file)) {
-      throw new SamzaException("Unable to access file: " + file);
-    }
-  }
-
-  /**
-   * Get the {@link URL} for a {@link File}.
-   * Converts checked exceptions into {@link SamzaException}s.
-   */
-  private static URL fileURL(File file) {
-    URI uri = file.toURI();
-    try {
-      return uri.toURL();
-    } catch (MalformedURLException e) {
-      throw new SamzaException("Unable to get URL for file: " + file, e);
-    }
-  }
-
-  /**
-   * Get the {@link URI} for a {@link URL}.
-   * Converts checked exceptions into {@link SamzaException}s.
-   */
-  private static URI urlToURI(URL url) {
-    try {
-      return url.toURI();
-    } catch (URISyntaxException e) {
-      throw new SamzaException("Unable to get URI for URL: " + url, e);
-    }
-  }
-
-  /**
-   * Get the {@link File} representing the {@link #LIB_DIRECTORY} inside the given {@code file}.
-   */
-  private static File libDirectory(File file) {
-    return new File(file, LIB_DIRECTORY);
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
index a152032..523742c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinatorRunner.java
@@ -26,7 +26,6 @@
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -35,7 +34,6 @@
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.util.CoordinatorStreamUtil;
-import org.apache.samza.util.SplitDeploymentUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,19 +50,12 @@
       LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. Exiting job coordinator", exception);
       System.exit(1);
     });
-    if (!SplitDeploymentUtil.isSplitDeploymentEnabled()) {
-      // no isolation enabled, so can just execute runClusterBasedJobCoordinator directly
-      runClusterBasedJobCoordinator(args);
-    } else {
-      SplitDeploymentUtil.runWithClassLoader(new IsolatingClassLoaderFactory().buildClassLoader(),
-          ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
-    }
+    runClusterBasedJobCoordinator(args);
     System.exit(0);
   }
 
   /**
-   * This is the actual execution for the {@link ClusterBasedJobCoordinator}. This is separated out from
-   * {@link #main(String[])} so that it can be executed directly or from a separate classloader.
+   * This is the actual execution for the {@link ClusterBasedJobCoordinator}.
    */
   @VisibleForTesting
   static void runClusterBasedJobCoordinator(String[] args) {
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManager.java
new file mode 100644
index 0000000..34a5aad
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import java.util.Collections;
+import java.util.Set;
+
+
+/**
+ * Simple placeholder implementation of {@link FaultDomainManager} which contains a single fault domain for all hosts.
+ * This can be used when another concrete {@link FaultDomainManager} is undesirable or unavailable, but features which
+ * depend on a {@link FaultDomainManager} (such as standby containers) may have unexpected behavior.
+ */
+public class SingleFaultDomainManager implements FaultDomainManager {
+  private static final FaultDomain SINGLE_FAULT_DOMAIN = new FaultDomain(FaultDomainType.RACK, "0");
+
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return Collections.singleton(SINGLE_FAULT_DOMAIN);
+  }
+
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    return Collections.singleton(SINGLE_FAULT_DOMAIN);
+  }
+
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    return true;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManagerFactory.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManagerFactory.java
new file mode 100644
index 0000000..db96b15
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SingleFaultDomainManagerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Produces a simple placeholder {@link FaultDomainManager}. See {@link SingleFaultDomainManager}.
+ */
+public class SingleFaultDomainManagerFactory implements FaultDomainManagerFactory {
+  @Override
+  public FaultDomainManager getFaultDomainManager(Config config, MetricsRegistry metricsRegistry) {
+    return new SingleFaultDomainManager();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index 3f27991..acb5a0a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -137,6 +137,20 @@
   private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
   private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled";
 
+  /**
+   * Use this to configure a static port for the job coordinator url for a Samza job. This url is used to provide
+   * information such as job model and locality.
+   * If the value is set to 0, then the port will be dynamically allocated from the available free ports on the node.
+   * The default value of this config is 0.
+   *
+   * Be careful when using this configuration. If the configured port is already in use on the node, then the job
+   * coordinator will fail to start.
+   *
+   * This configuration is experimental, and it might be removed in a future release.
+   */
+  private static final String JOB_COORDINATOR_URL_PORT = "cluster-manager.jobcoordinator.url.port";
+  private static final int DEFAULT_JOB_COORDINATOR_URL_PORT = 0;
+
   public ClusterManagerConfig(Config config) {
       super(config);
   }
@@ -280,4 +294,8 @@
       return true;
     }
   }
+
+  public int getCoordinatorUrlPort() {
+    return getInt(JOB_COORDINATOR_URL_PORT, DEFAULT_JOB_COORDINATOR_URL_PORT);
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 12257e0..4822067 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -155,8 +155,6 @@
   public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
   public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
 
-  public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = "job.split.deployment.enabled";
-
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
   // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability (AM-HA).
@@ -414,10 +412,6 @@
     return getStandbyTaskReplicationFactor() > 1;
   }
 
-  public boolean isSplitDeploymentEnabled() {
-    return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
-  }
-
   /**
    * The metadata file is written in a {@code exec-env-container-id}.metadata file in the log-dir of the container.
    * Here the {@code exec-env-container-id} refers to the ID assigned by the cluster manager (e.g., YARN) to the container,
diff --git a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 73093a8..73bcf8e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -57,29 +57,6 @@
    */
   public static final String ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID";
 
-  /**
-   * Set to "true" if split deployment feature is enabled. Otherwise, will be considered false.
-   *
-   * The launch process for the cluster-based job coordinator and job container depends on the value of this, since it
-   * needs to be known if the cluster-based job coordinator and job container should be launched in a split deployment
-   * mode.
-   * This needs to be an environment variable, because the value needs to be known before the full configs can be read
-   * from the metadata store (full configs are only read after launch is complete).
-   */
-  public static final String ENV_SPLIT_DEPLOYMENT_ENABLED = "ENV_SPLIT_DEPLOYMENT_ENABLED";
-
-  /**
-   * When running the cluster-based job coordinator and job container in a split deployment mode, it uses JARs and
-   * resources from a lib directory which is provided by the framework. In some cases, it is necessary to use some
-   * resources specified by the application as well. This environment variable can be set to a directory which is
-   * different from the framework lib directory in order to tell Samza where application resources live.
-   * This is an environment variable because it is needed in order to launch the cluster-based job coordinator and job
-   * container Java processes, which means access to full configs is not available yet.
-   * For example, this is used to set a system property for the location of an application-specified log4j configuration
-   * file when launching the cluster-based job coordinator and job container Java processes.
-   */
-  public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
-
   /*
    * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
    * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 4367244..b170bac 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -262,17 +262,6 @@
   }
 
   /**
-   * Helper method to check if a system has a changelog attached to it.
-   */
-  public boolean isChangelogSystem(String systemName) {
-    return getStoreNames().stream()
-        .map(this::getChangelogStream)
-        .filter(Optional::isPresent)
-        .map(systemStreamName -> StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem())
-        .anyMatch(system -> system.equals(systemName));
-  }
-
-  /**
    * Helper method to check if there is any stores configured w/ a changelog
    */
   public boolean hasDurableStores() {
diff --git a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
index 3d3a53d..804ef93 100644
--- a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
+++ b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
@@ -21,12 +21,24 @@
 
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
+
+/**
+ * This class is used for passing objects around for the implementation of the high-level API.
+ * 1) Container for objects that need to be passed between different components in the implementation of the high-level
+ * API.
+ * 2) The implementation of the high-level API is built on top of the low-level API. The low-level API only exposes
+ * {@link TaskContext}, but the implementation of the high-level API needs some other internal Samza components (e.g.
+ * {@link StreamMetadataCache}. We internally make these components available through {@link TaskContextImpl} so that we
+ * can do a cast to access the components. This class hides some of the messiness of casting. It's still not ideal to
+ * need to do any casting, even in this class.
+ */
 public class InternalTaskContext {
-
   private final Context context;
   private final Map<String, Object> objectRegistry = new HashMap<>();
 
@@ -46,6 +58,10 @@
     return context;
   }
 
+  /**
+   * TODO: The public {@link JobContext} exposes {@link JobModel} now, so can this internal method be replaced by the
+   * public API?
+   */
   public JobModel getJobModel() {
     return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
   }
@@ -53,4 +69,11 @@
   public StreamMetadataCache getStreamMetadataCache() {
     return ((TaskContextImpl) this.context.getTaskContext()).getStreamMetadataCache();
   }
+
+  /**
+   * See {@link TaskContextImpl#getSspsExcludingSideInputs()}.
+   */
+  public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+    return ((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs();
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index edec17d..d87a5bc 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -29,9 +29,15 @@
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableManager;
 
+import java.util.Set;
 import java.util.function.Function;
 
 
+/**
+ * This class provides the implementation for the public {@link TaskContext} interface.
+ * It also allows us to pass certain internal Samza components around so that the implementation of the high-level API
+ * can use them (see InternalTaskContext for some more details).
+ */
 public class TaskContextImpl implements TaskContext {
   private final TaskModel taskModel;
   private final MetricsRegistry taskMetricsRegistry;
@@ -39,8 +45,13 @@
   private final TableManager tableManager;
   private final CallbackScheduler callbackScheduler;
   private final OffsetManager offsetManager;
+
+  // The instance variables below are not used for implementing any public API methods. They are here so that we can
+  // pass some internal components over to the implementation of the high-level API. See InternalTaskContext.
+
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
+  private final Set<SystemStreamPartition> sspsExcludingSideInputs;
 
   public TaskContextImpl(TaskModel taskModel,
       MetricsRegistry taskMetricsRegistry,
@@ -49,7 +60,8 @@
       CallbackScheduler callbackScheduler,
       OffsetManager offsetManager,
       JobModel jobModel,
-      StreamMetadataCache streamMetadataCache) {
+      StreamMetadataCache streamMetadataCache,
+      Set<SystemStreamPartition> sspsExcludingSideInputs) {
     this.taskModel = taskModel;
     this.taskMetricsRegistry = taskMetricsRegistry;
     this.keyValueStoreProvider = keyValueStoreProvider;
@@ -58,6 +70,7 @@
     this.offsetManager = offsetManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
+    this.sspsExcludingSideInputs = sspsExcludingSideInputs;
   }
 
   @Override
@@ -101,4 +114,14 @@
   public StreamMetadataCache getStreamMetadataCache() {
     return this.streamMetadataCache;
   }
-}
+
+  /**
+   * Returns the {@link SystemStreamPartition}s excluding the side-input SSPs. For the high-level API, watermarks and
+   * end-of-stream messages are propagated based on their input SSPs. However, the Samza framework does not give side
+   * input messages to the high-level operator tasks. Therefore, the operators need to know the input SSPs excluding the
+   * side input SSPs. See SAMZA-2303 for more details.
+   */
+  public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+    return this.sspsExcludingSideInputs;
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
index 860c596..7e40382 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
@@ -36,7 +36,6 @@
 import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
@@ -284,44 +283,44 @@
     private static final String METADATA_WRITE_FAILED_COUNT = "metadata-write-failed-count";
     private static final String NEW_DEPLOYMENT = "new-deployment";
 
-    private final Counter applicationAttemptCount;
-    private final Counter metadataGenerationFailedCount;
-    private final Counter metadataReadFailedCount;
-    private final Counter metadataWriteFailedCount;
+    private final Gauge<Integer> applicationAttemptCount;
+    private final Gauge<Integer> metadataGenerationFailedCount;
+    private final Gauge<Integer> metadataReadFailedCount;
+    private final Gauge<Integer> metadataWriteFailedCount;
     private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
     private final Gauge<Integer> configChangedAcrossApplicationAttempt;
     private final Gauge<Integer> newDeployment;
 
     public JobCoordinatorMetadataManagerMetrics(MetricsRegistry registry) {
-      applicationAttemptCount = registry.newCounter(GROUP, APPLICATION_ATTEMPT_COUNT);
+      applicationAttemptCount = registry.newGauge(GROUP, APPLICATION_ATTEMPT_COUNT, 0);
       configChangedAcrossApplicationAttempt =
           registry.newGauge(GROUP, CONFIG_CHANGED, 0);
       jobModelChangedAcrossApplicationAttempt =
           registry.newGauge(GROUP, JOB_MODEL_CHANGED, 0);
-      metadataGenerationFailedCount = registry.newCounter(GROUP,
-          METADATA_GENERATION_FAILED_COUNT);
-      metadataReadFailedCount = registry.newCounter(GROUP, METADATA_READ_FAILED_COUNT);
-      metadataWriteFailedCount = registry.newCounter(GROUP, METADATA_WRITE_FAILED_COUNT);
+      metadataGenerationFailedCount = registry.newGauge(GROUP,
+          METADATA_GENERATION_FAILED_COUNT, 0);
+      metadataReadFailedCount = registry.newGauge(GROUP, METADATA_READ_FAILED_COUNT, 0);
+      metadataWriteFailedCount = registry.newGauge(GROUP, METADATA_WRITE_FAILED_COUNT, 0);
       newDeployment = registry.newGauge(GROUP, NEW_DEPLOYMENT, 0);
     }
 
     @VisibleForTesting
-    Counter getApplicationAttemptCount() {
+    Gauge<Integer> getApplicationAttemptCount() {
       return applicationAttemptCount;
     }
 
     @VisibleForTesting
-    Counter getMetadataGenerationFailedCount() {
+    Gauge<Integer> getMetadataGenerationFailedCount() {
       return metadataGenerationFailedCount;
     }
 
     @VisibleForTesting
-    Counter getMetadataReadFailedCount() {
+    Gauge<Integer> getMetadataReadFailedCount() {
       return metadataReadFailedCount;
     }
 
     @VisibleForTesting
-    Counter getMetadataWriteFailedCount() {
+    Gauge<Integer> getMetadataWriteFailedCount() {
       return metadataWriteFailedCount;
     }
 
@@ -341,19 +340,19 @@
     }
 
     void incrementApplicationAttemptCount() {
-      applicationAttemptCount.inc();
+      applicationAttemptCount.set(applicationAttemptCount.getValue() + 1);
     }
 
     void incrementMetadataGenerationFailedCount() {
-      metadataGenerationFailedCount.inc();
+      metadataGenerationFailedCount.set(metadataGenerationFailedCount.getValue() + 1);
     }
 
     void incrementMetadataReadFailedCount() {
-      metadataReadFailedCount.inc();
+      metadataReadFailedCount.set(metadataReadFailedCount.getValue() + 1);
     }
 
     void incrementMetadataWriteFailedCount() {
-      metadataWriteFailedCount.inc();
+      metadataWriteFailedCount.set(metadataWriteFailedCount.getValue() + 1);
     }
 
     void setConfigChangedAcrossApplicationAttempt(int value) {
diff --git a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
new file mode 100644
index 0000000..3725344
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job;
+
+import java.io.File;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ShellCommandConfig;
+
+
+public class ShellCommandBuilder extends CommandBuilder {
+  @Override
+  public String buildCommand() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(config);
+    if (StringUtils.isEmpty(this.commandPath)) {
+      return shellCommandConfig.getCommand();
+    } else {
+      return this.commandPath + File.separator + shellCommandConfig.getCommand();
+    }
+  }
+
+  @Override
+  public Map<String, String> buildEnvironment() {
+    ShellCommandConfig shellCommandConfig = new ShellCommandConfig(config);
+    ImmutableMap.Builder<String, String> envBuilder = new ImmutableMap.Builder<>();
+    envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id);
+    envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, this.url.toString());
+    envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse(""));
+    envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
+        shellCommandConfig.getAdditionalClasspathDir().orElse(""));
+    shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
+    return envBuilder.build();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java
new file mode 100644
index 0000000..5961d7a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of {@link MetricsReporter} which logs metrics which match a regex.
+ * The regex is checked against "[source name]-[group name]-[metric name]".
+ */
+public class LoggingMetricsReporter implements MetricsReporter {
+  private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
+  /**
+   * First part is source, second part is group name, third part is metric name
+   */
+  private static final String FULL_METRIC_FORMAT = "%s-%s-%s";
+
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final Pattern metricsToLog;
+  private final long loggingIntervalSeconds;
+  private final Queue<Runnable> loggingTasks = new ConcurrentLinkedQueue<>();
+
+  /**
+   * @param scheduledExecutorService executes the logging tasks
+   * @param metricsToLog Only log the metrics which match this regex. The strings for matching against this metric are
+   *                     constructed by concatenating source name, group name, and metric name, delimited by dashes.
+   * @param loggingIntervalSeconds interval at which to log metrics
+   */
+  public LoggingMetricsReporter(ScheduledExecutorService scheduledExecutorService, Pattern metricsToLog,
+      long loggingIntervalSeconds) {
+    this.scheduledExecutorService = scheduledExecutorService;
+    this.metricsToLog = metricsToLog;
+    this.loggingIntervalSeconds = loggingIntervalSeconds;
+  }
+
+  @Override
+  public void start() {
+    this.scheduledExecutorService.scheduleAtFixedRate(() -> this.loggingTasks.forEach(Runnable::run),
+        this.loggingIntervalSeconds, this.loggingIntervalSeconds, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void register(String source, ReadableMetricsRegistry registry) {
+    this.loggingTasks.add(buildLoggingTask(source, registry));
+  }
+
+  @Override
+  public void stop() {
+    this.scheduledExecutorService.shutdown();
+    try {
+      this.scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while shutting down executor", e);
+    }
+    if (!this.scheduledExecutorService.isTerminated()) {
+      LOG.warn("Unable to shutdown executor");
+    }
+  }
+
+  /**
+   * VisibleForTesting so that the logging call can be verified in unit tests.
+   */
+  @VisibleForTesting
+  void doLog(String logString) {
+    LOG.info(logString);
+  }
+
+  private Runnable buildLoggingTask(String source, ReadableMetricsRegistry registry) {
+    return () -> {
+      for (String group : registry.getGroups()) {
+        for (Map.Entry<String, Metric> metricGroupEntry : registry.getGroup(group).entrySet()) {
+          metricGroupEntry.getValue().visit(new MetricsVisitor() {
+            @Override
+            public void counter(Counter counter) {
+              logMetric(source, group, counter.getName(), counter.getCount());
+            }
+
+            @Override
+            public <T> void gauge(Gauge<T> gauge) {
+              logMetric(source, group, gauge.getName(), gauge.getValue());
+            }
+
+            @Override
+            public void timer(Timer timer) {
+              logMetric(source, group, timer.getName(), timer.getSnapshot().getAverage());
+            }
+          });
+        }
+      }
+    };
+  }
+
+  private <T> void logMetric(String source, String group, String metricName, T value) {
+    String fullMetricName = String.format(FULL_METRIC_FORMAT, source, group, metricName);
+    if (this.metricsToLog.matcher(fullMetricName).matches()) {
+      doLog(String.format("Metric: %s, Value: %s", fullMetricName, value));
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java
new file mode 100644
index 0000000..e7a256f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Optional;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+
+
+public class LoggingMetricsReporterConfig extends MapConfig {
+  private static final String METRICS_TO_LOG_REGEX_CONFIG = "metrics.reporter.%s.log.regex";
+  private static final String LOGGING_INTERVAL_SECONDS_CONFIG = "metrics.reporter.%s.logging.interval.seconds";
+  private static final long LOGGING_INTERVAL_SECONDS_DEFAULT = 60;
+
+  public LoggingMetricsReporterConfig(Config config) {
+    super(config);
+  }
+
+  public String getMetricsToLogRegex(String reporterName) {
+    String metricsToLogConfigKey = String.format(METRICS_TO_LOG_REGEX_CONFIG, reporterName);
+    return Optional.ofNullable(get(metricsToLogConfigKey))
+        .orElseThrow(() -> new ConfigException("Missing value for " + metricsToLogConfigKey));
+  }
+
+  public long getLoggingIntervalSeconds(String reporterName) {
+    return getLong(String.format(LOGGING_INTERVAL_SECONDS_CONFIG, reporterName), LOGGING_INTERVAL_SECONDS_DEFAULT);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java
new file mode 100644
index 0000000..88d3892
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.regex.Pattern;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
+
+
+/**
+ * Creates a {@link MetricsReporter} which logs metrics and their values.
+ * This can be used to access metric values when no other external metrics system is available.
+ */
+public class LoggingMetricsReporterFactory implements MetricsReporterFactory {
+  @Override
+  public MetricsReporter getMetricsReporter(String name, String processorId, Config config) {
+    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat("Samza LoggingMetricsReporter Thread-%d").setDaemon(true).build());
+    LoggingMetricsReporterConfig loggingMetricsReporterConfig = new LoggingMetricsReporterConfig(config);
+    Pattern metricsToLog = Pattern.compile(loggingMetricsReporterConfig.getMetricsToLogRegex(name));
+    return new LoggingMetricsReporter(scheduledExecutorService, metricsToLog,
+        loggingMetricsReporterConfig.getLoggingIntervalSeconds(name));
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 705f0cb..19cec80 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -111,17 +111,13 @@
       LOG.info("{} has {} producer tasks.", stream, count);
     });
 
-    // set states for end-of-stream
+    // set states for end-of-stream; don't include side inputs (see SAMZA-2303)
     internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
-        new EndOfStreamStates(
-                internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
-                producerTaskCounts));
-    // set states for watermark
+        new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts));
+    // set states for watermark; don't include side inputs (see SAMZA-2303)
     internalTaskContext.registerObject(WatermarkStates.class.getName(),
-        new WatermarkStates(
-                internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
-                producerTaskCounts,
-                context.getContainerContext().getContainerMetricsRegistry()));
+        new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
+            context.getContainerContext().getContainerMetricsRegistry()));
 
     specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
       SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index bc4b227..87429ed 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -68,7 +68,7 @@
     List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
     String offset = String.valueOf(messages.size());
 
-    if (message instanceof EndOfStreamMessage) {
+    if (shouldUseEndOfStreamOffset(message)) {
       offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
     }
 
@@ -224,4 +224,22 @@
 
     return ImmutableList.copyOf(messageEnvelopesForSSP.subList(startingOffset, messageEnvelopesForSSP.size()));
   }
+
+  /**
+   * We don't always want to use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for all
+   * {@link EndOfStreamMessage}s. Certain control message flows (e.g. end-of-stream) have an aggregation partition,
+   * which needs to listen for messages from all other partitions. These aggregation messages are marked by the task
+   * name being non-null. If we use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for the aggregation messages,
+   * then the aggregation partition would stop listening once it got the message from one of the tasks, but that means
+   * it would miss the aggregation messages from all other tasks. See SAMZA-2300 for more details.
+   * One other note: If there is a serializer set for the stream, then by the time the message gets to this check, it
+   * will be a byte array, so this check will not return true, even if the deserialized message was an
+   * {@link EndOfStreamMessage}. So far this isn't a problem, because we only really need this to return true for
+   * input streams (not intermediate streams), and in-memory input stream data doesn't get serialized. For intermediate
+   * streams, we don't need END_OF_STREAM_OFFSET to be used since the high-level operators take care of end-of-stream
+   * messages based on MessageType.
+   */
+  private static boolean shouldUseEndOfStreamOffset(Object message) {
+    return (message instanceof EndOfStreamMessage) && ((EndOfStreamMessage) message).getTaskName() == null;
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java b/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
deleted file mode 100644
index 200cd3c..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/SplitDeploymentUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.ShellCommandConfig;
-
-
-public final class SplitDeploymentUtil {
-
-  /**
-   * The split deployment feature uses system env {@code ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED} to represent
-   * if the user chooses to enable it.
-   * This function helps to detect if the split deployment feature is enabled.
-   *
-   * @return true if split deployment is enabled; vice versa
-   */
-  public static boolean isSplitDeploymentEnabled() {
-    return Boolean.parseBoolean(System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
-  }
-
-  /**
-   * Execute the runner class using a separate isolated classloader.
-   * @param classLoader {@link ClassLoader} to use to load the runner class which will run
-   * @param originalRunnerClass {@link Class} for which will be executed with the new class loader.
-   * @param runMethodName run method name of runner class
-   * @param runMethodArgs arguments to pass to run method
-   */
-  public static void runWithClassLoader(ClassLoader classLoader, Class<?> originalRunnerClass, String runMethodName,
-      String[] runMethodArgs) {
-    // need to use the isolated classloader to load run method and then execute using that new class
-    Class<?> runnerClass;
-    try {
-      runnerClass = classLoader.loadClass(originalRunnerClass.getName());
-    } catch (ClassNotFoundException e) {
-      throw new SamzaException(String.format(
-          "Isolation was enabled, but unable to find %s in isolated classloader", originalRunnerClass.getName()), e);
-    }
-
-    // save the current context classloader so it can be reset after finishing the call to run method
-    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
-    // this is needed because certain libraries (e.g. log4j) use the context classloader
-    Thread.currentThread().setContextClassLoader(classLoader);
-
-    try {
-      executeRunForRunnerClass(runnerClass, runMethodName, runMethodArgs);
-    } finally {
-      // reset the context class loader; it's good practice, and could be important when running a test suite
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-  }
-
-  private static void executeRunForRunnerClass(Class<?> runnerClass, String runMethodName, String[] runMethodArgs) {
-    Method runMethod;
-    try {
-      runMethod = runnerClass.getDeclaredMethod(runMethodName, String[].class);
-    } catch (NoSuchMethodException e) {
-      throw new SamzaException(String.format("Isolation was enabled, but unable to find %s method", runMethodName), e);
-    }
-    // only sets accessible flag for this method instance
-    runMethod.setAccessible(true);
-
-    try {
-      // wrapping args in object array so that args is passed as a single argument to the method
-      runMethod.invoke(null, new Object[]{runMethodArgs});
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new SamzaException(String.format("Exception while executing %s method", runMethodName), e);
-    }
-  }
-}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index f91aae1..4e1c41a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -88,7 +88,7 @@
       }
     })
   private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
-    new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
+    new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache, systemStreamPartitions)
   // need separate field for this instead of using it through Context, since Context throws an exception if it is null
   private val applicationTaskContextOption = applicationTaskContextFactoryOption
     .map(_.create(externalContextOption.orNull, jobContext, containerContext, taskContext,
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 7c0e747..5dd662b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -100,7 +100,8 @@
 
       updateTaskAssignments(jobModel, taskAssignmentManager, taskPartitionAssignmentManager, grouperMetadata)
 
-      val server = new HttpServer
+      val clusterManagerConfig = new ClusterManagerConfig(config)
+      val server = new HttpServer(port = clusterManagerConfig.getCoordinatorUrlPort)
       server.addServlet("/", new JobServlet(serializedJobModelRef))
       server.addServlet("/locality", new LocalityServlet(localityManager))
 
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
deleted file mode 100644
index 9b95648..0000000
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job
-
-
-import java.io.File
-
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-
-import scala.collection.JavaConverters._
-
-class ShellCommandBuilder extends CommandBuilder {
-  def buildCommand() = {
-    val shellCommandConfig = new ShellCommandConfig(config)
-    if(commandPath == null || commandPath.isEmpty())
-      shellCommandConfig.getCommand
-    else
-      commandPath + File.separator +  shellCommandConfig.getCommand
-  }
-
-  def buildEnvironment(): java.util.Map[String, String] = {
-    val shellCommandConfig = new ShellCommandConfig(config)
-    val envMap = Map(
-      ShellCommandConfig.ENV_CONTAINER_ID -> id.toString,
-      ShellCommandConfig.ENV_COORDINATOR_URL -> url.toString,
-      ShellCommandConfig.ENV_JAVA_OPTS -> shellCommandConfig.getTaskOpts.orElse(""),
-      ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR -> shellCommandConfig.getAdditionalClasspathDir.orElse(""))
-
-    val envMapWithJavaHome = JavaOptionals.toRichOptional(shellCommandConfig.getJavaHome).toOption match {
-      case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> javaHome)
-      case None => envMap
-    }
-
-    envMapWithJavaHome.asJava
-  }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java b/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
deleted file mode 100644
index 7444fbf..0000000
--- a/samza-core/src/test/java/org/apache/samza/classloader/TestIsolatingClassLoaderFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.classloader;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Set;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.samza.SamzaException;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestIsolatingClassLoaderFactory {
-  @Test
-  public void testGetApiClasses() throws URISyntaxException {
-    File apiClassListFile = Paths.get(getClass().getResource("/classloader").toURI()).toFile();
-    List<String> apiClassNames = IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(apiClassListFile);
-    List<String> expected = ImmutableList.of(
-        "org.apache.samza.JavaClass",
-        "org.apache.samza.JavaClass$InnerJavaClass",
-        "org.apache.samza.ScalaClass$",
-        "org.apache.samza.ScalaClass$$anon$1",
-        "my.package.with.wildcard.*",
-        "my.package.with.question.mark?");
-    assertEquals(expected, apiClassNames);
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetApiClassesFileDoesNotExist() throws URISyntaxException {
-    File nonExistentDirectory =
-        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
-    IsolatingClassLoaderFactory.getFrameworkApiClassGlobs(nonExistentDirectory);
-  }
-
-  @Test
-  public void testGetClasspathAsURLs() throws URISyntaxException {
-    File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
-    URL[] classpath = IsolatingClassLoaderFactory.getClasspathAsURLs(classpathDirectory);
-    assertEquals(2, classpath.length);
-    Set<URL> classpathSet = ImmutableSet.copyOf(classpath);
-    URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
-    assertTrue(classpathSet.contains(jarUrl));
-    URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
-    assertTrue(classpathSet.contains(warUrl));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetClasspathAsURLsDirectoryDoesNotExist() throws URISyntaxException {
-    File nonExistentDirectory =
-        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
-    IsolatingClassLoaderFactory.getClasspathAsURLs(nonExistentDirectory);
-  }
-
-  @Test
-  public void testGetClasspathAsURIs() throws URISyntaxException {
-    File classpathDirectory = Paths.get(getClass().getResource("/classloader/classpath").toURI()).toFile();
-    List<URI> classpath = IsolatingClassLoaderFactory.getClasspathAsURIs(classpathDirectory);
-    assertEquals(2, classpath.size());
-    Set<URI> classpathSet = ImmutableSet.copyOf(classpath);
-    URL jarUrl = getClass().getResource("/classloader/classpath/placeholder-jar.jar");
-    assertTrue(classpathSet.contains(jarUrl.toURI()));
-    URL warUrl = getClass().getResource("/classloader/classpath/placeholder-war.war");
-    assertTrue(classpathSet.contains(warUrl.toURI()));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetClasspathAsURIsDirectoryDoesNotExist() throws URISyntaxException {
-    File nonExistentDirectory =
-        new File(Paths.get(getClass().getResource("/classloader").toURI()).toFile(), "doesNotExist");
-    IsolatingClassLoaderFactory.getClasspathAsURIs(nonExistentDirectory);
-  }
-}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 00c2d5e..af4bd1d 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -546,17 +546,6 @@
   }
 
   @Test
-  public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
-    Config config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
-    assertTrue(new JobConfig(config).isSplitDeploymentEnabled());
-
-    config = new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
-    assertFalse(new JobConfig(config).isSplitDeploymentEnabled());
-
-    assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
-  }
-
-  @Test
   public void testGetMetadataFile() {
     String execEnvContainerId = "container-id";
     String containerMetadataDirectory = "/tmp/samza/log/dir";
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 5dba698..0d06c3b 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -359,18 +359,6 @@
   }
 
   @Test
-  public void testIsChangelogSystem() {
-    StorageConfig storageConfig = new StorageConfig(new MapConfig(ImmutableMap.of(
-        // store0 has a changelog stream
-        String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
-        String.format(CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream",
-        // store1 does not have a changelog stream
-        String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
-    assertTrue(storageConfig.isChangelogSystem("system0"));
-    assertFalse(storageConfig.isChangelogSystem("other-system"));
-  }
-
-  @Test
   public void testHasDurableStores() {
     // no changelog, which means no durable stores
     StorageConfig storageConfig = new StorageConfig(
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 0e8f78e..094583e 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -62,7 +62,7 @@
     MockitoAnnotations.initMocks(this);
     taskContext =
         new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
-            offsetManager, null, null);
+            offsetManager, null, null, null);
     when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
index d623aed..b1739d9 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
@@ -141,13 +141,13 @@
     JobCoordinatorMetadata newMetadataWithNoChange =
         new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
     assertEquals("Application attempt count should be 0", 0,
-        metrics.getApplicationAttemptCount().getCount());
+        metrics.getApplicationAttemptCount().getValue().intValue());
 
     metadataChanged =
         jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, newMetadataWithNoChange);
     assertFalse("Metadata check should return false", metadataChanged);
     assertEquals("Application attempt count should be 1", 1,
-        metrics.getApplicationAttemptCount().getCount());
+        metrics.getApplicationAttemptCount().getValue().intValue());
   }
 
   @Test
@@ -161,7 +161,7 @@
     } catch (Exception e) {
       assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
       assertEquals("Metadata generation failed count should be 1", 1,
-          jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getCount());
+          jobCoordinatorMetadataManager.getMetrics().getMetadataGenerationFailedCount().getValue().intValue());
     }
   }
 
@@ -211,7 +211,7 @@
     JobCoordinatorMetadata actualMetadata = jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     assertNull("Read failed should return null", actualMetadata);
     assertEquals("Metadata read failed count should be 1", 1,
-        jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getCount());
+        jobCoordinatorMetadataManager.getMetrics().getMetadataReadFailedCount().getValue().intValue());
   }
 
   @Test
@@ -240,7 +240,7 @@
     } catch (Exception e) {
       assertTrue("Expecting SamzaException to be thrown", e instanceof SamzaException);
       assertEquals("Metadata write failed count should be 1", 1,
-          jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getCount());
+          jobCoordinatorMetadataManager.getMetrics().getMetadataWriteFailedCount().getValue().intValue());
     }
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
new file mode 100644
index 0000000..ca7be0e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestShellCommandBuilder {
+  private static final String URL_STRING = "http://www.google.com";
+
+  @Test
+  public void testBasicBuild() throws MalformedURLException {
+    Config config = new MapConfig(ImmutableMap.of(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo"));
+    ShellCommandBuilder shellCommandBuilder = new ShellCommandBuilder();
+    shellCommandBuilder.setConfig(config);
+    shellCommandBuilder.setId("1");
+    shellCommandBuilder.setUrl(new URL(URL_STRING));
+    Map<String, String> expectedEnvironment = ImmutableMap.of(
+        ShellCommandConfig.ENV_CONTAINER_ID, "1",
+        ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING,
+        ShellCommandConfig.ENV_JAVA_OPTS, "",
+        ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
+    // assertions when command path is not set
+    assertEquals("foo", shellCommandBuilder.buildCommand());
+    assertEquals(expectedEnvironment, shellCommandBuilder.buildEnvironment());
+    // assertions when command path is set to empty string
+    shellCommandBuilder.setCommandPath("");
+    assertEquals("foo", shellCommandBuilder.buildCommand());
+    assertEquals(expectedEnvironment, shellCommandBuilder.buildEnvironment());
+  }
+
+  @Test
+  public void testBuildEnvironment() throws MalformedURLException {
+    Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
+        .put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
+        .put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
+        .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
+        .put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
+        .build());
+    ShellCommandBuilder shellCommandBuilder = new ShellCommandBuilder();
+    shellCommandBuilder.setConfig(config);
+    shellCommandBuilder.setId("1");
+    shellCommandBuilder.setUrl(new URL(URL_STRING));
+    Map<String, String> expectedEnvironment = new ImmutableMap.Builder<String, String>()
+        .put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
+        .put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
+        .put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
+        .put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
+        .put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
+        .build();
+    assertEquals(expectedEnvironment, shellCommandBuilder.buildEnvironment());
+  }
+
+  @Test
+  public void testBuildCommandWithCommandPath() throws MalformedURLException {
+    Config config = new MapConfig(ImmutableMap.of(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo"));
+    ShellCommandBuilder shellCommandBuilder = new ShellCommandBuilder();
+    shellCommandBuilder.setConfig(config);
+    shellCommandBuilder.setId("1");
+    shellCommandBuilder.setUrl(new URL(URL_STRING));
+    shellCommandBuilder.setCommandPath("/package/path");
+    assertEquals("/package/path/foo", shellCommandBuilder.buildCommand());
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java
new file mode 100644
index 0000000..c1f5972
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Snapshot;
+import org.apache.samza.metrics.Timer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestLoggingMetricsReporter {
+  private static final long LOGGING_INTERVAL_SECONDS = 15;
+  private static final String COUNTER_NAME = "counter_name";
+  private static final long COUNTER_VALUE = 10;
+  private static final String GAUGE_NAME = "gauge_name";
+  private static final double GAUGE_VALUE = 20.0;
+  private static final String TIMER_NAME = "timer_name";
+  private static final double TIMER_VALUE = 30.0;
+  private static final Pattern DEFAULT_PATTERN = Pattern.compile(".*_name");
+  private static final String GROUP_NAME = "group_name";
+  private static final String SOURCE_NAME = "source_name";
+
+  @Mock
+  private ScheduledExecutorService scheduledExecutorService;
+  @Mock
+  private ReadableMetricsRegistry readableMetricsRegistry;
+  @Mock
+  private Counter counter;
+  @Mock
+  private Gauge<Double> gauge;
+  @Mock
+  private Timer timer;
+  @Mock
+  private Snapshot timerSnapshot;
+
+  private LoggingMetricsReporter loggingMetricsReporter;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    when(this.scheduledExecutorService.scheduleAtFixedRate(any(), eq(LOGGING_INTERVAL_SECONDS),
+        eq(LOGGING_INTERVAL_SECONDS), eq(TimeUnit.SECONDS))).thenAnswer((Answer<Void>) invocation -> {
+          Runnable runnable = invocation.getArgumentAt(0, Runnable.class);
+          runnable.run();
+          return null;
+        });
+
+    when(this.counter.getName()).thenReturn(COUNTER_NAME);
+    when(this.counter.getCount()).thenReturn(COUNTER_VALUE);
+    doAnswer(invocation -> {
+      invocation.getArgumentAt(0, MetricsVisitor.class).counter(this.counter);
+      return null;
+    }).when(this.counter).visit(any());
+
+    when(this.gauge.getName()).thenReturn(GAUGE_NAME);
+    when(this.gauge.getValue()).thenReturn(GAUGE_VALUE);
+    doAnswer(invocation -> {
+      invocation.getArgumentAt(0, MetricsVisitor.class).gauge(this.gauge);
+      return null;
+    }).when(this.gauge).visit(any());
+
+    when(this.timer.getName()).thenReturn(TIMER_NAME);
+    when(this.timer.getSnapshot()).thenReturn(this.timerSnapshot);
+    doAnswer(invocation -> {
+      invocation.getArgumentAt(0, MetricsVisitor.class).timer(this.timer);
+      return null;
+    }).when(this.timer).visit(any());
+    when(this.timerSnapshot.getAverage()).thenReturn(TIMER_VALUE);
+
+    this.loggingMetricsReporter =
+        spy(new LoggingMetricsReporter(this.scheduledExecutorService, DEFAULT_PATTERN, LOGGING_INTERVAL_SECONDS));
+  }
+
+  @Test
+  public void testMetricTypes() {
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    Map<String, Metric> metrics =
+        ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge, TIMER_NAME, this.timer);
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(metrics);
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.start();
+
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-counter_name, Value: 10");
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-gauge_name, Value: 20.0");
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-timer_name, Value: 30.0");
+  }
+
+  @Test
+  public void testMultipleRegister() {
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter));
+    ReadableMetricsRegistry otherRegistry = mock(ReadableMetricsRegistry.class);
+    String otherGroupName = "other_group";
+    when(otherRegistry.getGroups()).thenReturn(Collections.singleton(otherGroupName));
+    when(otherRegistry.getGroup(otherGroupName)).thenReturn(ImmutableMap.of(GAUGE_NAME, this.gauge));
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.register("other_source", otherRegistry);
+    this.loggingMetricsReporter.start();
+
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-counter_name, Value: 10");
+    verify(this.loggingMetricsReporter).doLog("Metric: other_source-other_group-gauge_name, Value: 20.0");
+  }
+
+  @Test
+  public void testFiltering() {
+    Pattern countersOnly = Pattern.compile(".*counter.*");
+    this.loggingMetricsReporter =
+        spy(new LoggingMetricsReporter(this.scheduledExecutorService, countersOnly, LOGGING_INTERVAL_SECONDS));
+
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    Map<String, Metric> metrics = ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge);
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(metrics);
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.start();
+
+    ArgumentCaptor<String> logs = ArgumentCaptor.forClass(String.class);
+    verify(this.loggingMetricsReporter).doLog(logs.capture());
+    assertEquals(Collections.singletonList("Metric: source_name-group_name-counter_name, Value: 10"),
+        logs.getAllValues());
+  }
+
+  @Test
+  public void testNewMetricsAfterRegister() {
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    // first round of logging has one metric (counter only), second call has two (counter and gauge)
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter))
+        .thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge));
+
+    // capture the logging task so it can be directly executed by the test
+    ArgumentCaptor<Runnable> loggingRunnable = ArgumentCaptor.forClass(Runnable.class);
+    when(this.scheduledExecutorService.scheduleAtFixedRate(loggingRunnable.capture(), eq(LOGGING_INTERVAL_SECONDS),
+        eq(LOGGING_INTERVAL_SECONDS), eq(TimeUnit.SECONDS))).thenReturn(null);
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.start();
+
+    // simulate first scheduled execution of logging task
+    loggingRunnable.getValue().run();
+    String expectedCounterLog = "Metric: source_name-group_name-counter_name, Value: 10";
+    // only should get log for counter for the first call
+    verify(this.loggingMetricsReporter).doLog(expectedCounterLog);
+    String expectedGaugeLog = "Metric: source_name-group_name-gauge_name, Value: 20.0";
+    verify(this.loggingMetricsReporter, never()).doLog(expectedGaugeLog);
+
+    // simulate second scheduled execution of logging task
+    loggingRunnable.getValue().run();
+    // should get second log for counter, first log for gauge
+    verify(this.loggingMetricsReporter, times(2)).doLog(expectedCounterLog);
+    verify(this.loggingMetricsReporter).doLog(expectedGaugeLog);
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java
new file mode 100644
index 0000000..40b44e7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestLoggingMetricsReporterConfig {
+  private static final String REPORTER_NAME = "reporter_name";
+
+  @Test
+  public void testGetMetricsToLogRegex() {
+    Map<String, String> configMap = ImmutableMap.of("metrics.reporter.reporter_name.log.regex", ".*metric.*");
+    assertEquals(".*metric.*",
+        new LoggingMetricsReporterConfig(new MapConfig(configMap)).getMetricsToLogRegex(REPORTER_NAME));
+  }
+
+  @Test(expected = ConfigException.class)
+  public void testGetMetricsToLogRegexMissing() {
+    new LoggingMetricsReporterConfig(new MapConfig()).getMetricsToLogRegex(REPORTER_NAME);
+  }
+
+  @Test
+  public void testGetLoggingIntervalSeconds() {
+    assertEquals(60, new LoggingMetricsReporterConfig(new MapConfig()).getLoggingIntervalSeconds(REPORTER_NAME));
+
+    Map<String, String> configMap = ImmutableMap.of("metrics.reporter.reporter_name.logging.interval.seconds", "100");
+    assertEquals(100,
+        new LoggingMetricsReporterConfig(new MapConfig(configMap)).getLoggingIntervalSeconds(REPORTER_NAME));
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 76b79a7..8218720 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -36,6 +36,7 @@
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 import org.apache.samza.job.model.TaskModel;
@@ -93,11 +94,13 @@
     Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
     Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
 
+    SystemStreamPartition ssp = new SystemStreamPartition("kafka", "integers", new Partition(0));
     TaskModel taskModel = mock(TaskModel.class);
-    when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+    when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet.of(ssp));
     when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
     when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+    when(((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs()).thenReturn(
+        ImmutableSet.of(ssp));
     when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
index 0a2e221..9439b01 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -127,7 +127,7 @@
   }
 
   @Test
-  public void testEndOfStreamMessage() {
+  public void testEndOfStreamMessageWithTask() {
     EndOfStreamMessage eos = new EndOfStreamMessage("test-task");
 
     produceMessages(eos);
@@ -139,6 +139,24 @@
     List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
 
     assertEquals(1, results.size());
+    assertEquals("test-task", ((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
+    assertFalse(results.get(0).isEndOfStream());
+  }
+
+  @Test
+  public void testEndOfStreamMessageWithoutTask() {
+    EndOfStreamMessage eos = new EndOfStreamMessage();
+
+    produceMessages(eos);
+
+    Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+        .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition)))
+        .collect(Collectors.toSet());
+
+    List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
+
+    assertEquals(1, results.size());
+    assertNull(((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
     assertTrue(results.get(0).isEndOfStream());
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
deleted file mode 100644
index 72772ba..0000000
--- a/samza-core/src/test/java/org/apache/samza/util/TestSplitDeploymentUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import org.apache.samza.clustermanager.ClusterBasedJobCoordinatorRunner;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-import static org.mockito.AdditionalMatchers.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.*;
-
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ClusterBasedJobCoordinatorRunner.class})
-public class TestSplitDeploymentUtil {
-
-  @Test
-  public void testRunWithIsolatingClassLoader() throws Exception {
-    // partially mock ClusterBasedJobCoordinator (mock runClusterBasedJobCoordinator method only)
-    PowerMockito.spy(ClusterBasedJobCoordinatorRunner.class);
-    // save the context classloader to make sure that it gets set properly once the test is finished
-    ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
-    ClassLoader classLoader = mock(ClassLoader.class);
-    String[] args = new String[]{"arg0", "arg1"};
-    doReturn(ClusterBasedJobCoordinatorRunner.class).when(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
-
-    // stub the private static method which is called by reflection
-    PowerMockito.doAnswer(invocation -> {
-        // make sure the only calls to this method has the expected arguments
-      assertArrayEquals(args, invocation.getArgumentAt(0, String[].class));
-      // checks that the context classloader is set correctly
-      assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
-      return null;
-    }).when(ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", any());
-
-    try {
-      SplitDeploymentUtil.runWithClassLoader(classLoader,
-          ClusterBasedJobCoordinatorRunner.class, "runClusterBasedJobCoordinator", args);
-      assertEquals(previousContextClassLoader, Thread.currentThread().getContextClassLoader());
-    } finally {
-      // reset it explicitly just in case runWithClassLoader throws an exception
-      Thread.currentThread().setContextClassLoader(previousContextClassLoader);
-    }
-    // make sure that the classloader got used
-    verify(classLoader).loadClass(ClusterBasedJobCoordinatorRunner.class.getName());
-    // make sure runClusterBasedJobCoordinator only got called once
-    verifyPrivate(ClusterBasedJobCoordinatorRunner.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
-  }
-}
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
deleted file mode 100644
index c70af8d..0000000
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job
-
-import org.junit.Assert._
-import org.junit.Test
-import scala.collection.JavaConverters._
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.ShellCommandConfig
-import java.net.URL
-
-class TestShellCommandBuilder {
-  @Test
-  def testEnvironmentVariables {
-    val urlStr = "http://www.google.com"
-    val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
-    val scb = new ShellCommandBuilder
-    scb.setConfig(config)
-    scb.setId("1")
-    scb.setUrl(new URL(urlStr))
-    val command = scb.buildCommand
-    val environment = scb.buildEnvironment
-    assertEquals("foo", command)
-    assertEquals("1", environment.get(ShellCommandConfig.ENV_CONTAINER_ID))
-    assertEquals(urlStr, environment.get(ShellCommandConfig.ENV_COORDINATOR_URL))
-  }
-
-  // if cmdPath is specified, the full path to the command should be adjusted
-  @Test
-  def testBuildCommandWithCommandPath {
-    val urlStr = "http://www.linkedin.com"
-    val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
-    val scb = new ShellCommandBuilder
-    scb.setConfig(config)
-    scb.setId("1")
-    scb.setUrl(new URL(urlStr))
-    val command = scb.buildCommand
-    assertEquals("foo", command)
-
-    scb.setCommandPath("/package/path")
-    val command1 = scb.buildCommand
-    assertEquals("/package/path/foo", command1)
-  }
-}
\ No newline at end of file
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 01c22f5..97443e3 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -55,10 +55,6 @@
     Preconditions.checkNotNull(type);
     Preconditions.checkState(!grouperFactoryClassName.isEmpty(), "Empty grouper factory class provided");
 
-    Preconditions.checkState(CHECKPOINT_V1_KEY_TYPE.equals(type) || CHECKPOINT_V2_KEY_TYPE.equals(type),
-        String.format("Invalid type provided for checkpoint key. Expected: (%s or %s) Actual: (%s)",
-            CHECKPOINT_V1_KEY_TYPE, CHECKPOINT_V2_KEY_TYPE, type));
-
     this.grouperFactoryClassName = grouperFactoryClassName;
     this.taskName = taskName;
     this.type = type;
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
index c5c431b..59ff34f 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKeySerde.java
@@ -57,9 +57,8 @@
   public KafkaCheckpointLogKey fromBytes(byte[] bytes) {
     try {
       LinkedHashMap<String, String> deserializedKey = MAPPER.readValue(bytes, LinkedHashMap.class);
-      String key = deserializedKey.get(TYPE_FIELD);
 
-      return new KafkaCheckpointLogKey(key, new TaskName(deserializedKey.get(TASK_NAME_FIELD)),
+      return new KafkaCheckpointLogKey(deserializedKey.get(TYPE_FIELD), new TaskName(deserializedKey.get(TASK_NAME_FIELD)),
           deserializedKey.get(SSP_GROUPER_FACTORY_FIELD));
     } catch (Exception e) {
       throw new SamzaException(String.format("Exception in de-serializing checkpoint bytes: %s",
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index e719439..7dbb9b3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -335,7 +335,7 @@
     partitionMetaData.getOldestOffset
   }
 
-  private def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
+  def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
     checkpoint match {
       case checkpointV1: CheckpointV1 => {
         val key = new KafkaCheckpointLogKey(
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 69a9966..391536a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -381,14 +381,12 @@
   }
 
   def getKafkaSystemProducerConfig( systemName: String,
-                                    clientId: String,
-                                    injectedProps: Map[String, String] = Map()) = {
+                                    clientId: String) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)
     val producerProps = new util.HashMap[String, String]()
     producerProps.putAll(subConf)
     producerProps.put("client.id", clientId)
-    producerProps.putAll(injectedProps.asJava)
     new KafkaProducerConfig(systemName, clientId, producerProps)
   }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 8d1fd6b..a2773f6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -32,14 +32,6 @@
 import org.apache.samza.util._
 
 object KafkaSystemFactory extends Logging {
-  @VisibleForTesting
-  def getInjectedProducerProperties(systemName: String, config: Config) = if (new StorageConfig(config).isChangelogSystem(systemName)) {
-    warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName)
-    Map[String, String]("compression.type" -> "none")
-  } else {
-    Map[String, String]()
-  }
-
   val CLIENTID_PRODUCER_PREFIX = "kafka-producer"
   val CLIENTID_CONSUMER_PREFIX = "kafka-consumer"
   val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer"
@@ -67,9 +59,8 @@
   }
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
-    val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
     val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX, config);
-    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
     val getProducer = () => {
       new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
     }
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
index 7245e70..08fa02c 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKeySerde.java
@@ -53,11 +53,23 @@
 
   @Test
   public void testCheckpointTypeV2() {
-    KafkaCheckpointLogKey keyV2 = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE,
-        new TaskName("Partition 0"), GroupByPartitionFactory.class.getCanonicalName());
+    KafkaCheckpointLogKey keyV2 = new KafkaCheckpointLogKey(KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE, new TaskName("Partition 0"),
+        GroupByPartitionFactory.class.getCanonicalName());
     KafkaCheckpointLogKeySerde checkpointKeySerde = new KafkaCheckpointLogKeySerde();
 
     // test that deserialize(serialize(k)) == k
     Assert.assertEquals(keyV2, checkpointKeySerde.fromBytes(checkpointKeySerde.toBytes(keyV2)));
   }
+
+  @Test
+  public void testForwardsCompatibility() {
+    // Set the key to another value, this is for the future if we want to support multiple checkpoint keys
+    // we do not want to throw in the Serdes layer, but must be validated in the CheckpointManager
+    KafkaCheckpointLogKey key = new KafkaCheckpointLogKey("checkpoint-v2",
+        new TaskName("Partition 0"), GroupByPartitionFactory.class.getCanonicalName());
+    KafkaCheckpointLogKeySerde checkpointSerde = new KafkaCheckpointLogKeySerde();
+
+    // test that deserialize(serialize(k)) == k
+    Assert.assertEquals(key, checkpointSerde.fromBytes(checkpointSerde.toBytes(key)));
+  }
 }
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 866904b..835f53e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.checkpoint.kafka
 
 import java.util.Properties
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils.{CoreUtils, TestUtils}
 import com.google.common.collect.ImmutableMap
@@ -29,7 +28,7 @@
 import org.apache.samza.container.TaskName
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.serializers.CheckpointV1Serde
+import org.apache.samza.serializers.{CheckpointV1Serde, CheckpointV2Serde}
 import org.apache.samza.system._
 import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -276,6 +275,42 @@
   }
 
   @Test
+  def testReadCheckpointShouldIgnoreUnknownCheckpointKeys(): Unit = {
+      val checkpointTopic = "checkpoint-topic-1"
+      val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+      kcm1.register(taskName)
+      kcm1.createResources
+      kcm1.start
+      kcm1.stop
+
+      // check that start actually creates the topic with log compaction enabled
+      val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+      assertEquals(topicConfig, new KafkaConfig(config).getCheckpointTopicProperties())
+      assertEquals("compact", topicConfig.get("cleanup.policy"))
+      assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+      // read before topic exists should result in a null checkpoint
+      val readCp = readCheckpoint(checkpointTopic, taskName)
+      assertNull(readCp)
+    // skips unknown checkpoints from checkpoint topic
+    writeCheckpoint(checkpointTopic, taskName, checkpoint1, "checkpoint-v2", useMock = true)
+    assertNull(readCheckpoint(checkpointTopic, taskName, useMock = true))
+
+    // reads latest v1 checkpoints
+    writeCheckpoint(checkpointTopic, taskName, checkpoint1, useMock = true)
+    assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName, useMock = true))
+
+    // writing checkpoint v2 still returns the previous v1 checkpoint
+    writeCheckpoint(checkpointTopic, taskName, checkpoint2, "checkpoint-v2", useMock = true)
+    assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName, useMock = true))
+
+    // writing checkpoint2 with the correct key returns the checkpoint2
+    writeCheckpoint(checkpointTopic, taskName, checkpoint2, useMock = true)
+    assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName, useMock = true))
+  }
+
+  @Test
   def testWriteCheckpointShouldRetryFiniteTimesOnFailure(): Unit = {
     val checkpointTopic = "checkpoint-topic-2"
     val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
@@ -401,7 +436,8 @@
   }
 
   private def createKafkaCheckpointManager(cpTopic: String, serde: CheckpointV1Serde = new CheckpointV1Serde,
-    failOnTopicValidation: Boolean = true, overrideConfig: Config = config) = {
+    failOnTopicValidation: Boolean = true, useMock: Boolean = false, checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE,
+    overrideConfig: Config = config) = {
     val kafkaConfig = new org.apache.samza.config.KafkaConfig(overrideConfig)
     val props = kafkaConfig.getCheckpointTopicProperties()
     val systemName = kafkaConfig.getCheckpointSystem.getOrElse(
@@ -414,11 +450,17 @@
     val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props)
-    new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, overrideConfig, new NoOpMetricsRegistry, serde)
+
+    if (useMock) {
+      new MockKafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, serde, checkpointKey)
+    } else {
+      new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, overrideConfig, new NoOpMetricsRegistry, serde)
+    }
   }
 
-  private def readCheckpoint(checkpointTopic: String, taskName: TaskName, config: Config = config) : Checkpoint = {
-    val kcm = createKafkaCheckpointManager(checkpointTopic, overrideConfig = config)
+  private def readCheckpoint(checkpointTopic: String, taskName: TaskName, config: Config = config,
+    useMock: Boolean = false) : Checkpoint = {
+    val kcm = createKafkaCheckpointManager(checkpointTopic, overrideConfig = config, useMock = useMock)
     kcm.register(taskName)
     kcm.start
     val checkpoint = kcm.readLastCheckpoint(taskName)
@@ -426,8 +468,9 @@
     checkpoint
   }
 
-  private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint): Unit = {
-    val kcm = createKafkaCheckpointManager(checkpointTopic)
+  private def writeCheckpoint(checkpointTopic: String, taskName: TaskName, checkpoint: Checkpoint,
+    checkpointKey: String = KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE, useMock: Boolean = false): Unit = {
+    val kcm = createKafkaCheckpointManager(checkpointTopic, checkpointKey = checkpointKey, useMock = useMock)
     kcm.register(taskName)
     kcm.start
     kcm.writeCheckpoint(taskName, checkpoint)
@@ -456,4 +499,35 @@
     }
   }
 
+
+  class MockKafkaCheckpointManager(spec: KafkaStreamSpec, systemFactory: SystemFactory, failOnTopicValidation: Boolean,
+    serde: CheckpointV1Serde = new CheckpointV1Serde, checkpointKey: String)
+    extends KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config,
+      new NoOpMetricsRegistry, serde) {
+
+    override def buildOutgoingMessageEnvelope[T <: Checkpoint](taskName: TaskName, checkpoint: T): OutgoingMessageEnvelope = {
+      val key = new KafkaCheckpointLogKey(checkpointKey, taskName, expectedGrouperFactory)
+      val keySerde = new KafkaCheckpointLogKeySerde
+      val checkpointMsgSerde = new CheckpointV1Serde
+      val checkpointV2MsgSerde = new CheckpointV2Serde
+      val keyBytes = try {
+        keySerde.toBytes(key)
+      } catch {
+        case e: Exception => throw new SamzaException(s"Exception when writing checkpoint-key for $taskName: $checkpoint", e)
+      }
+      val msgBytes = try {
+        checkpoint match {
+          case v1: CheckpointV1 =>
+            checkpointMsgSerde.toBytes(v1)
+          case v2: CheckpointV2 =>
+            checkpointV2MsgSerde.toBytes(v2)
+          case _ =>
+            throw new IllegalArgumentException("Unknown checkpoint key type for test, please use Checkpoint v1 or v2")
+        }
+      } catch {
+        case e: Exception => throw new SamzaException(s"Exception when writing checkpoint for $taskName: $checkpoint", e)
+      }
+      new OutgoingMessageEnvelope(checkpointSsp, keyBytes, msgBytes)
+    }
+  }
 }
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 596d67b..ecbc00d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -82,16 +82,4 @@
     assertNotNull(producer)
     assertTrue(producer.isInstanceOf[KafkaSystemProducer])
   }
-
-  @Test
-  def testInjectedProducerProps {
-    val configMap = Map[String, String](
-      StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
-      StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
-      StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
-    val config = new MapConfig(configMap.asJava)
-    assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
-    assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
-    assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
-  }
 }
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 0908a97..ac2b3f0 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -69,6 +69,8 @@
   private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
   private static final String ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS = "rocksdb.delete.obsolete.files.period.micros";
   private static final String ROCKSDB_MAX_MANIFEST_FILE_SIZE = "rocksdb.max.manifest.file.size";
+  private static final String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+  private static final String ROCKSDB_MAX_FILE_OPENING_THREADS = "rocksdb.max.file.opening.threads";
 
   public static Options options(Config storeConfig, int numTasksForContainer, File storeDir, StorageEngineFactory.StoreMode storeMode) {
     Options options = new Options();
@@ -124,6 +126,8 @@
     options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 64 * 1024 * 1024L));
     options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 2));
     options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L));
+    options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
+    options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16));
     // The default for rocksdb is 18446744073709551615, which is larger than java Long.MAX_VALUE. Hence setting it only if it's passed.
     if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
       options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
index 0044904..f9a1f24 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
@@ -49,6 +49,9 @@
   static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
   static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
   static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
+  static final public String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+  static final public String ROCKSDB_MAX_FILE_OPENING_THREADS = "rocksdb.max.file.opening.threads";
+
 
   private Integer writeBatchSize;
   private Integer objectCacheSize;
@@ -61,6 +64,8 @@
   private Integer numLogFilesToKeep;
   private String compressionType;
   private String compactionStyle;
+  private Integer maxOpenFiles;
+  private Integer maxFileOpeningThreads;
 
   /**
    * Constructs a table descriptor instance
@@ -273,6 +278,35 @@
     return this;
   }
 
+  /**
+   * Limits the number of open files that can be used by the DB.  You may need to increase this if your database
+   * has a large working set. Value -1 means files opened are always kept open.
+   * <p>
+   *  Default value is -1.
+   * <p>
+   * @param maxOpenFiles the number of open files that can be used by the DB.
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxOpenFiles(int maxOpenFiles) {
+    this.maxOpenFiles = maxOpenFiles;
+    return this;
+  }
+
+  /**
+   * Sets the number of threads used to open DB files.
+   * If max_open_files is -1, DB will open all files on DB::Open(). You can use this option to increase the number of
+   * threads used to open files.
+   * <p>
+   * Default is 16.
+   * <p>
+   * @param maxFileOpeningThreads The number of threads to use when opening DB files.
+   * @return the table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxFileOpeningThreads(int maxFileOpeningThreads) {
+    this.maxFileOpeningThreads = maxFileOpeningThreads;
+    return this;
+  }
+
   @Override
   public String getProviderFactoryClassName() {
     return LocalTableProviderFactory.class.getName();
@@ -320,6 +354,12 @@
     if (numLogFilesToKeep != null) {
       addStoreConfig(ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString(), tableConfig);
     }
+    if (maxOpenFiles != null) {
+      addStoreConfig(ROCKSDB_MAX_OPEN_FILES, maxOpenFiles.toString(), tableConfig);
+    }
+    if (maxFileOpeningThreads != null) {
+      addStoreConfig(ROCKSDB_MAX_FILE_OPENING_THREADS, maxFileOpeningThreads.toString(), tableConfig);
+    }
 
     return Collections.unmodifiableMap(tableConfig);
   }
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index c58d123..9fc9938 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -73,10 +73,12 @@
         .withTtl(7)
         .withWriteBatchSize(8)
         .withWriteBufferSize(9)
+        .withMaxOpenFiles(10)
+        .withMaxFileOpeningThreads(11)
         .withConfig("abc", "xyz")
         .toConfig(createJobConfig());
 
-    Assert.assertEquals(14, tableConfig.size());
+    Assert.assertEquals(16, tableConfig.size());
     assertEquals("1", RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES, tableConfig);
     assertEquals("2", RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES, tableConfig);
     assertEquals("3", RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, tableConfig);
@@ -86,6 +88,8 @@
     assertEquals("7", RocksDbTableDescriptor.ROCKSDB_TTL_MS, tableConfig);
     assertEquals("8", RocksDbTableDescriptor.WRITE_BATCH_SIZE, tableConfig);
     assertEquals("9", RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES, tableConfig);
+    assertEquals("10", RocksDbTableDescriptor.ROCKSDB_MAX_OPEN_FILES, tableConfig);
+    assertEquals("11", RocksDbTableDescriptor.ROCKSDB_MAX_FILE_OPENING_THREADS, tableConfig);
     assertEquals("snappy", RocksDbTableDescriptor.ROCKSDB_COMPRESSION, tableConfig);
     assertEquals("fifo", RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE, tableConfig);
     Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID)));
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 2b03c31..f82a576 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -81,7 +81,7 @@
   private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled";
 
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
-  private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+  private final BlockingQueue<EncodedLogEvent> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
 
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
@@ -233,13 +233,13 @@
    */
   private void handleEvent(LogEvent event) throws InterruptedException {
     if (usingAsyncLogger) {
-      sendEventToSystemProducer(encodeLogEventToBytes(event));
+      sendEventToSystemProducer(encodeLogEvent(event));
       return;
     }
 
     // Serialize the event before adding to the queue to leverage the caller thread
     // and ensure that the transferThread can keep up.
-    if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+    if (!logQueue.offer(encodeLogEvent(event), queueTimeoutS, TimeUnit.SECONDS)) {
       // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
       // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
       // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
@@ -265,8 +265,8 @@
     metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
   }
 
-  protected byte[] encodeLogEventToBytes(LogEvent event) {
-    return serde.toBytes(subLog(event));
+  protected EncodedLogEvent encodeLogEvent(LogEvent event) {
+    return new ByteArrayEncodedLogEvent(serde.toBytes(subLog(event)));
   }
 
   private Message subAppend(LogEvent event) {
@@ -437,21 +437,21 @@
 
   /**
    * Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
-   * @param serializedLogEvent
+   * @param logQueueEntry the serialized log-event to be sent to the systemProducer
    */
-  private void sendEventToSystemProducer(byte[] serializedLogEvent) {
-    metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
+  private void sendEventToSystemProducer(EncodedLogEvent logQueueEntry) {
+    metrics.logMessagesBytesSent.inc(logQueueEntry.getEntryValueSize());
     metrics.logMessagesCountSent.inc();
-    systemProducer.send(SOURCE, decorateLogEvent(serializedLogEvent));
+    systemProducer.send(SOURCE, decorateLogEvent(logQueueEntry));
   }
 
   /**
    * Helper method to create an OutgoingMessageEnvelope from the serialized log event.
-   * @param messageBytes message bytes
+   * @param logQueueEntry message bytes
    * @return OutgoingMessageEnvelope that contains the message bytes along with the system stream
    */
-  protected OutgoingMessageEnvelope decorateLogEvent(byte[] messageBytes) {
-    return new OutgoingMessageEnvelope(systemStream, keyBytes, messageBytes);
+  protected OutgoingMessageEnvelope decorateLogEvent(EncodedLogEvent logQueueEntry) {
+    return new OutgoingMessageEnvelope(systemStream, keyBytes, logQueueEntry.getValue());
   }
 
   protected String getStreamName(String jobName, String jobId) {
@@ -498,4 +498,42 @@
   public Serde<LogEvent> getSerde() {
     return serde;
   }
+
+  /**
+   * A LogQeueEntry is the element inserted into the log queue of the stream appender
+   * that holds the log messages before they are sent to the underlying system producer.
+   * @param <T> type of object held as the entry
+   */
+  protected interface EncodedLogEvent<T> {
+    /**
+     * fetches the size of the log message held within the LogQueueEntry
+     * @return size of the log message
+     */
+    public long getEntryValueSize();
+
+    /**
+     * fetches the actual log message held within the LogQueueEntry
+     * @return the actual log message
+     */
+    public T getValue();
+  }
+
+  /**
+   * LogQueueEntry impl that holds the serialized byte[] of the log message.
+   */
+  private class ByteArrayEncodedLogEvent implements EncodedLogEvent<byte[]> {
+    final byte[] entryValue;
+    public ByteArrayEncodedLogEvent(byte[] array) {
+      entryValue = array;
+    }
+    @Override
+    public byte[] getValue() {
+      return entryValue;
+    }
+
+    @Override
+    public long getEntryValueSize() {
+      return entryValue.length;
+    }
+  }
 }
\ No newline at end of file
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 9b5ac30..a162ad5 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -53,13 +53,13 @@
 echo APPLICATION_LIB_DIR=$APPLICATION_LIB_DIR
 echo BASE_LIB_DIR=$BASE_LIB_DIR
 
-BASE_LIB_CLASSPATH=""
+CLASSPATH=""
 # all the jars need to be appended on newlines to ensure line argument length of 72 bytes is not violated
 for file in $BASE_LIB_DIR/*.[jw]ar;
 do
-  BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH" $file \n"
+  CLASSPATH=$CLASSPATH" $file \n"
 done
-echo generated from BASE_LIB_DIR BASE_LIB_CLASSPATH=$BASE_LIB_CLASSPATH
+echo generated from BASE_LIB_DIR CLASSPATH=$CLASSPATH
 
 # In some cases (AWS) $JAVA_HOME/bin doesn't contain jar.
 if [ -z "$JAVA_HOME" ] || [ ! -e "$JAVA_HOME/bin/jar" ]; then
@@ -68,23 +68,21 @@
   JAR="$JAVA_HOME/bin/jar"
 fi
 
-# Create a pathing JAR for the JARs in the BASE_LIB_DIR
-# Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
-printf "Class-Path: \n $BASE_LIB_CLASSPATH \n" > base-lib-manifest.txt
-# Creates a new archive and adds custom manifest information to base-lib-pathing.jar
-eval "$JAR -cvmf base-lib-manifest.txt base-lib-pathing.jar"
+# Create a separate directory for writing files related to classpath management. It is easier to manage
+# permissions for the classpath-related files when they are in their own directory. An example of where
+# this is helpful is when using container images which might have predefined permissions for certain
+# directories.
+CLASSPATH_WORKSPACE_DIR=$base_dir/classpath_workspace
+mkdir -p $CLASSPATH_WORKSPACE_DIR
+# file containing the classpath string; used to avoid passing long classpaths directly to the jar command
+PATHING_MANIFEST_FILE=$CLASSPATH_WORKSPACE_DIR/manifest.txt
+# jar file to include on the classpath for running the main class
+PATHING_JAR_FILE=$CLASSPATH_WORKSPACE_DIR/pathing.jar
 
-# Create a pathing JAR for the runtime framework resources. It is useful to separate this from the base-lib-pathing.jar
-# because the split deployment framework may only need the resources from this runtime pathing JAR.
-if ! [[ $HADOOP_CONF_DIR =~ .*/$ ]]; then
-  # manifest requires a directory to have a trailing slash
-  HADOOP_CONF_DIR="$HADOOP_CONF_DIR/"
-fi
-# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
-RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH="$HADOOP_CONF_DIR \n"
-# TODO add JARs from ADDITIONAL_CLASSPATH_DIR to runtime-framework-resources-pathing.jar as well
-printf "Class-Path: \n $RUNTIME_FRAMEWORK_RESOURCES_CLASSPATH \n" > runtime-framework-resources-manifest.txt
-eval "$JAR -cvmf runtime-framework-resources-manifest.txt runtime-framework-resources-pathing.jar"
+# Newlines and spaces are intended to ensure proper parsing of manifest in pathing jar
+printf "Class-Path: \n $CLASSPATH \n" > $PATHING_MANIFEST_FILE
+# Creates a new archive and adds custom manifest information to pathing.jar
+eval "$JAR -cvmf $PATHING_MANIFEST_FILE $PATHING_JAR_FILE"
 
 if [ -z "$JAVA_HOME" ]; then
   JAVA="java"
@@ -163,11 +161,12 @@
 # Check if 64 bit is set. If not - try and set it if it's supported
 [[ $JAVA_OPTS != *-d64* ]] && check_and_enable_64_bit_mode
 
-echo $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
+echo $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"
 
 ## If localized resource lib directory is defined, then include it in the classpath.
 if [[ -z "${ADDITIONAL_CLASSPATH_DIR}" ]]; then
-  exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar "$@"
+  exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"
 else
-  exec $JAVA $JAVA_OPTS -cp base-lib-pathing.jar:runtime-framework-resources-pathing.jar:$ADDITIONAL_CLASSPATH_DIR "$@"
-fi
\ No newline at end of file
+  exec $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE:$ADDITIONAL_CLASSPATH_DIR "$@"
+fi
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index fb65eea..943a8ca 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -107,6 +107,14 @@
     this.configs = new HashMap<>();
     this.inMemoryScope = RandomStringUtils.random(10, true, true);
     configs.put(ApplicationConfig.APP_NAME, APP_NAME);
+    /*
+     * Use a unique app id to help make sure a test execution is isolated from others.
+     * A concrete example of where this helps is to avoid an issue with ControlMessageSender. It has a static cache
+     * keyed by stream id to save partition counts for intermediate streams. This means that different tests can
+     * collide in this cache if they use the same intermediate stream names. Having a unique app id makes the
+     * intermediate streams unique across tests.
+     */
+    configs.put(ApplicationConfig.APP_ID, this.inMemoryScope);
     configs.put(JobConfig.PROCESSOR_ID, "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY, InMemoryMetadataStoreFactory.class.getCanonicalName());
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 0d1e49a..012eece 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -188,6 +188,7 @@
     configMap.put("systems.kafka.samza.key.serde", "string");
     configMap.put("systems.kafka.samza.msg.serde", "string");
     configMap.put("systems.kafka.samza.offset.default", "oldest");
+    configMap.put("systems.kafka.producer.compression.type", "snappy");
     configMap.put("job.coordinator.system", "kafka");
     configMap.put("job.default.system", "kafka");
     configMap.put("job.coordinator.replication.factor", "1");
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index 78fc7b5..ea81f13 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.table;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -27,10 +28,8 @@
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.Context;
@@ -39,7 +38,6 @@
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -47,11 +45,10 @@
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
@@ -62,50 +59,42 @@
 import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 
 /**
  * This test class tests sendTo() and join() for local tables
  */
-public class TestLocalTableEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableEndToEnd {
+  private static final String SYSTEM_NAME = "test";
+  private static final String PAGEVIEW_STREAM = "pageview";
+  private static final String PROFILE_STREAM = "profile";
 
   @Test
-  public void testSendTo() throws Exception {
-
-    int count = 10;
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
+  public void testSendTo() {
     MyMapFunction mapFn = new MyMapFunction();
+    StreamApplication app = appDesc -> {
+      Table<KV<Integer, Profile>> table =
+          appDesc.getTable(new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
 
-    final StreamApplication app = appDesc -> {
-
-      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
-          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
-      appDesc.getInputStream(isd)
-          .map(mapFn)
-          .sendTo(table);
+      appDesc.getInputStream(isd).map(mapFn).sendTo(table);
     };
 
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    executeRun(runner, config);
-    runner.waitForFinish();
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+    InMemoryInputDescriptor<Profile> profileStreamDesc = isd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
 
-    for (int i = 0; i < partitionCount; i++) {
+    int numProfilesPerPartition = 10;
+    int numInputPartitions = 4;
+    Map<Integer, List<Profile>> inputProfiles =
+        TestTableData.generatePartitionedProfiles(numProfilesPerPartition * numInputPartitions, numInputPartitions);
+    TestRunner.of(app).addInputStream(profileStreamDesc, inputProfiles).run(Duration.ofSeconds(10));
+
+    for (int i = 0; i < numInputPartitions; i++) {
       MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
-      assertEquals(count, mapFnCopy.received.size());
-      mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+      assertEquals(numProfilesPerPartition, mapFnCopy.received.size());
+      mapFnCopy.received.forEach(p -> assertNotNull(mapFnCopy.table.get(p.getMemberId())));
     }
   }
 
@@ -116,52 +105,49 @@
     @Override
     public void describe(StreamApplicationDescriptor appDesc) {
       Table<KV<Integer, Profile>> table = appDesc.getTable(
-          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+          new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+      profileISD.shouldBootstrap();
       appDesc.getInputStream(profileISD)
-          .map(m -> new KV(m.getMemberId(), m))
+          .map(m -> new KV<>(m.getMemberId(), m))
           .sendTo(table);
 
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
       appDesc.getInputStream(pageViewISD)
           .map(pv -> {
             received.add(pv);
             return pv;
           })
-          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()), "p1")
           .join(table, new PageViewToProfileJoinFunction())
           .sink((m, collector, coordinator) -> joined.add(m));
     }
   }
 
   @Test
-  public void testStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
+  public void testStreamTableJoin() {
+    int totalPageViews = 40;
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+    Map<Integer, List<PageView>> inputPageViews =
+        TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+    // 10 is the max member id for page views
+    Map<Integer, List<Profile>> inputProfiles =
+        TestTableData.generatePartitionedProfiles(10, partitionCount);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
+    InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
 
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+    TestRunner.of(new StreamTableJoinApp())
+        .addInputStream(pageViewStreamDesc, inputPageViews)
+        .addInputStream(profileStreamDesc, inputProfiles)
+        .run(Duration.ofSeconds(10));
 
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.samza.bootstrap", "true");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
-    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
-    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+    assertEquals(totalPageViews, StreamTableJoinApp.received.size());
+    assertEquals(totalPageViews, StreamTableJoinApp.joined.size());
+    assertNotNull(StreamTableJoinApp.joined.get(0));
   }
 
   static class DualStreamTableJoinApp implements StreamApplication {
@@ -178,29 +164,31 @@
       PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
       PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
 
-      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor<>("t1", profileKVSerde));
 
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
-      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+      profileISD1.shouldBootstrap();
+      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
+      profileISD2.shouldBootstrap();
       MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
       MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
 
       profileStream1
           .map(m -> {
             sentToProfileTable1.add(m);
-            return new KV(m.getMemberId(), m);
+            return new KV<>(m.getMemberId(), m);
           })
           .sendTo(profileTable);
       profileStream2
           .map(m -> {
             sentToProfileTable2.add(m);
-            return new KV(m.getMemberId(), m);
+            return new KV<>(m.getMemberId(), m);
           })
           .sendTo(profileTable);
 
-      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
-      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
       MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
       MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
 
@@ -217,45 +205,40 @@
   }
 
   @Test
-  public void testDualStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
+  public void testDualStreamTableJoin() {
+    int totalPageViews = 40;
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+    Map<Integer, List<PageView>> inputPageViews1 =
+        TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+    Map<Integer, List<PageView>> inputPageViews2 =
+        TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+    // 10 is the max member id for page views
+    int numProfiles = 10;
+    Map<Integer, List<Profile>> inputProfiles1 = TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+    Map<Integer, List<Profile>> inputProfiles2 = TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc1 = isd
+        .getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc2 = isd
+        .getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
+    InMemoryInputDescriptor<Profile> profileStreamDesc1 = isd
+        .getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+    InMemoryInputDescriptor<Profile> profileStreamDesc2 = isd
+        .getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
 
-    configs.put("streams.Profile1.samza.system", "test");
-    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile1.samza.bootstrap", "true");
-    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+    TestRunner.of(new DualStreamTableJoinApp())
+        .addInputStream(pageViewStreamDesc1, inputPageViews1)
+        .addInputStream(pageViewStreamDesc2, inputPageViews2)
+        .addInputStream(profileStreamDesc1, inputProfiles1)
+        .addInputStream(profileStreamDesc2, inputProfiles2)
+        .run(Duration.ofSeconds(10));
 
-    configs.put("streams.Profile2.samza.system", "test");
-    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile2.samza.bootstrap", "true");
-    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView1.samza.system", "test");
-    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView2.samza.system", "test");
-    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
-    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
-    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+    assertEquals(numProfiles, DualStreamTableJoinApp.sentToProfileTable1.size());
+    assertEquals(numProfiles, DualStreamTableJoinApp.sentToProfileTable2.size());
+    assertEquals(totalPageViews, DualStreamTableJoinApp.joinedPageViews1.size());
+    assertEquals(totalPageViews, DualStreamTableJoinApp.joinedPageViews2.size());
+    assertNotNull(DualStreamTableJoinApp.joinedPageViews1.get(0));
+    assertNotNull(DualStreamTableJoinApp.joinedPageViews2.get(0));
   }
 
   static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
@@ -283,8 +266,7 @@
   }
 
   private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
-    private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+    private static final Map<String, MyMapFunction> TASK_TO_MAP_FUNCTION_MAP = new HashMap<>();
 
     private transient List<Profile> received;
     private transient ReadWriteTable table;
@@ -294,17 +276,17 @@
       table = context.getTaskContext().getTable("t1");
       this.received = new ArrayList<>();
 
-      taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+      TASK_TO_MAP_FUNCTION_MAP.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
     }
 
     @Override
     public KV<Integer, Profile> apply(Profile profile) {
       received.add(profile);
-      return new KV(profile.getMemberId(), profile);
+      return new KV<>(profile.getMemberId(), profile);
     }
 
     public static MyMapFunction getMapFunctionByTask(String taskName) {
-      return taskToMapFunctionMap.get(taskName);
+      return TASK_TO_MAP_FUNCTION_MAP.get(taskName);
     }
   }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
index 071f65e..34ac29a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -19,128 +19,190 @@
 
 package org.apache.samza.test.table;
 
-import com.google.common.collect.ImmutableList;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
-import org.apache.samza.SamzaException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.framework.StreamAssert;
 import org.apache.samza.test.framework.TestRunner;
 import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
 import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
 import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.IntegrationTestHarness;
 import org.junit.Test;
 
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
 import static org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 
-public class TestLocalTableWithSideInputsEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableWithSideInputsEndToEnd {
+  private static final String SYSTEM_NAME = "test";
   private static final String PAGEVIEW_STREAM = "pageview";
   private static final String PROFILE_STREAM = "profile";
+  private static final String PROFILE_TABLE = "profile-table";
   private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
+  private static final SystemStream OUTPUT_SYSTEM_STREAM = new SystemStream(SYSTEM_NAME, ENRICHED_PAGEVIEW_STREAM);
 
   @Test
-  public void testJoinWithSideInputsTable() {
+  public void testLowLevelJoinWithSideInputsTable() throws InterruptedException {
+    int partitionCount = 4;
+    IntegerSerde integerSerde = new IntegerSerde();
+    // for low-level, need to pre-partition the input in the same way that the profiles are partitioned
+    Map<Integer, List<PageView>> pageViewsPartitionedByMemberId =
+        TestTableData.generatePartitionedPageViews(20, partitionCount)
+            .values()
+            .stream()
+            .flatMap(List::stream)
+            .collect(Collectors.groupingBy(
+              pageView -> Math.abs(Arrays.hashCode(integerSerde.toBytes(pageView.getMemberId()))) % partitionCount));
     runTest(
-        "test",
+        new LowLevelPageViewProfileJoin(),
+        pageViewsPartitionedByMemberId,
+        TestTableData.generatePartitionedProfiles(10, partitionCount));
+  }
+
+  @Test
+  public void testJoinWithSideInputsTable() throws InterruptedException {
+    runTest(
         new PageViewProfileJoin(),
-        Arrays.asList(TestTableData.generatePageViews(10)),
-        Arrays.asList(TestTableData.generateProfiles(10)));
+        TestTableData.generatePartitionedPageViews(20, 4),
+        TestTableData.generatePartitionedProfiles(10, 4));
   }
 
   @Test
-  public void testJoinWithDurableSideInputTable() {
+  public void testJoinWithDurableSideInputTable() throws InterruptedException {
     runTest(
-        "test",
         new DurablePageViewProfileJoin(),
-        Arrays.asList(TestTableData.generatePageViews(5)),
-        Arrays.asList(TestTableData.generateProfiles(5)));
+        TestTableData.generatePartitionedPageViews(20, 4),
+        TestTableData.generatePartitionedProfiles(10, 4));
   }
 
-  private void runTest(String systemName, StreamApplication app, List<PageView> pageViews,
-      List<Profile> profiles) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PAGEVIEW_STREAM), systemName);
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PROFILE_STREAM), systemName);
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, ENRICHED_PAGEVIEW_STREAM), systemName);
-
-    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
-
+  private <T extends ApplicationDescriptor<?>> void runTest(SamzaApplication<T> app,
+      Map<Integer, List<PageView>> pageViews,
+      Map<Integer, List<Profile>> profiles) throws InterruptedException {
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
-        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
-
+        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
     InMemoryInputDescriptor<Profile> profileStreamDesc = isd
-        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
-
+        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
     InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
-        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
+        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>());
 
-    TestRunner
-        .of(app)
+    TestRunner.of(app)
         .addInputStream(pageViewStreamDesc, pageViews)
         .addInputStream(profileStreamDesc, profiles)
         .addOutputStream(outputStreamDesc, 1)
-        .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(100000));
+        .run(Duration.ofSeconds(10));
 
-    try {
-      Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-      List<EnrichedPageView> results = result.values().stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
+    List<EnrichedPageView> expectedEnrichedPageViews = buildExpectedEnrichedPageViews(pageViews, profiles);
+    StreamAssert.containsInAnyOrder(expectedEnrichedPageViews, outputStreamDesc, Duration.ofSeconds(1));
+  }
 
-      List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
-          .flatMap(pv -> profiles.stream()
-              .filter(profile -> pv.memberId == profile.memberId)
-              .map(profile -> new EnrichedPageView(pv.pageKey, profile.memberId, profile.company)))
-          .collect(Collectors.toList());
+  private static List<EnrichedPageView> buildExpectedEnrichedPageViews(Map<Integer, List<PageView>> pageViews,
+      Map<Integer, List<Profile>> profiles) {
+    ImmutableMap.Builder<Integer, Profile> profilesByMemberIdBuilder = new ImmutableMap.Builder<>();
+    profiles.values()
+        .stream()
+        .flatMap(List::stream)
+        .forEach(profile -> profilesByMemberIdBuilder.put(profile.getMemberId(), profile));
+    Map<Integer, Profile> profilesByMemberId = profilesByMemberIdBuilder.build();
+    ImmutableList.Builder<EnrichedPageView> enrichedPageViewsBuilder = new ImmutableList.Builder<>();
+    pageViews.values()
+        .stream()
+        .flatMap(List::stream)
+        .forEach(pageView -> Optional.ofNullable(profilesByMemberId.get(pageView.getMemberId()))
+            .ifPresent(profile -> enrichedPageViewsBuilder.add(
+                new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(), profile.getCompany()))));
+    return enrichedPageViewsBuilder.build();
+  }
 
-      boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains);
-      assertEquals("Mismatch between the expected and actual join count", expectedEnrichedPageviews.size(), results.size());
-      assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
-    } catch (SamzaException e) {
-      e.printStackTrace();
+  static class LowLevelPageViewProfileJoin implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      appDescriptor.withInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>()));
+      appDescriptor.withInputStream(sd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>()));
+
+      TableDescriptor<Integer, Profile, ?> tableDescriptor = new InMemoryTableDescriptor<>(PROFILE_TABLE,
+          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())).withSideInputs(ImmutableList.of(PROFILE_STREAM))
+          .withSideInputsProcessor((msg, store) -> {
+            Profile profile = (Profile) msg.getMessage();
+            int key = profile.getMemberId();
+            return ImmutableList.of(new Entry<>(key, profile));
+          });
+      appDescriptor.withTable(tableDescriptor);
+
+      appDescriptor.withOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()));
+
+      appDescriptor.withTaskFactory((StreamTaskFactory) PageViewProfileJoinStreamTask::new);
+    }
+  }
+
+  static class PageViewProfileJoinStreamTask implements InitableTask, StreamTask {
+    private ReadWriteTable<Integer, Profile> profileTable;
+
+    @Override
+    public void init(Context context) {
+      this.profileTable = context.getTaskContext().getTable(PROFILE_TABLE);
+    }
+
+    @Override
+    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+      PageView pageView = (PageView) envelope.getMessage();
+      Profile profile = this.profileTable.get(pageView.getMemberId());
+      if (profile != null) {
+        EnrichedPageView enrichedPageView =
+            new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(), profile.getCompany());
+        collector.send(new OutgoingMessageEnvelope(OUTPUT_SYSTEM_STREAM, enrichedPageView));
+      }
     }
   }
 
   static class PageViewProfileJoin implements StreamApplication {
-    static final String PROFILE_TABLE = "profile-table";
-
     @Override
     public void describe(StreamApplicationDescriptor appDescriptor) {
       Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
-      KafkaSystemDescriptor sd =
-          new KafkaSystemDescriptor("test");
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM_NAME);
       appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
-          .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
+          .partitionBy(TestTableData.PageView::getMemberId, v -> v,
+              KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "partition-page-view")
           .join(table, new PageViewToProfileJoinFunction())
           .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
     }
 
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+      return new InMemoryTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
             Profile profile = (Profile) msg.getMessage();
@@ -153,7 +215,7 @@
   static class DurablePageViewProfileJoin extends PageViewProfileJoin {
     @Override
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+      return new RocksDbTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
             TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
@@ -162,4 +224,4 @@
           });
     }
   }
-}
\ No newline at end of file
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index 76c56b0..39f9b02 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -20,19 +20,29 @@
 package org.apache.samza.test.table;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
 
 
 public class TestTableData {
+  private static final IntegerSerde INTEGER_SERDE = new IntegerSerde();
 
   public static class PageView implements Serializable {
     @JsonProperty("pageKey")
@@ -205,6 +215,35 @@
     return pageviews;
   }
 
+  /**
+   * Create page views and spread out page views with the same member id across different partitions.
+   * Member ids are spread out like this to make sure that partitionBy operators properly repartition the messages.
+   * Member ids are assigned randomly from [0, 10).
+   *
+   * Example
+   * generatePartitionedPageViews(20, 4) will return:
+   * 0 -> page views with member ids [0, 5)
+   * 1 -> page views with member ids [6, 10)
+   * 2 -> page views with member ids [0, 5)
+   * 3 -> page views with member ids [6, 10)
+   */
+  public static Map<Integer, List<PageView>> generatePartitionedPageViews(int numPageViews, int partitionCount) {
+    Preconditions.checkArgument(numPageViews % partitionCount == 0, "partitionCount must divide numPageViews evenly");
+    int numPerPartition = numPageViews / partitionCount;
+    Random random = new Random();
+    ImmutableMap.Builder<Integer, List<PageView>> pageViewsBuilder = new ImmutableMap.Builder<>();
+    for (int i = 0; i < partitionCount; i++) {
+      pageViewsBuilder.put(i, new ArrayList<>());
+    }
+    Map<Integer, List<PageView>> pageViews = pageViewsBuilder.build();
+    for (int i = 0; i < numPageViews; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = i % 10;
+      pageViews.get(i / numPerPartition).add(new PageView(pagekey, memberId));
+    }
+    return pageViews;
+  }
+
   static public PageView[] generatePageViewsWithDistinctKeys(int count) {
     Random random = new Random();
     PageView[] pageviews = new PageView[count];
@@ -227,4 +266,20 @@
     return profiles;
   }
 
+  /**
+   * Create profiles and partition them based on the bytes representation of the member id. This uses the bytes
+   * representation for partitioning because this needs to use the same partition function as the InMemorySystemProducer
+   * (which is used in the test framework) so that table joins can be tested.
+   * One profile for each member id in [0, numProfiles) is created.
+   */
+  public static Map<Integer, List<Profile>> generatePartitionedProfiles(int numProfiles, int partitionCount) {
+    Random random = new Random();
+    return IntStream.range(0, numProfiles)
+        .mapToObj(i -> {
+          String company = COMPANIES[random.nextInt(COMPANIES.length - 1)];
+          return new Profile(i, company);
+        })
+        .collect(Collectors.groupingBy(
+          profile -> Math.abs(Arrays.hashCode(INTEGER_SERDE.toBytes(profile.getMemberId()))) % partitionCount));
+  }
 }
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index b85e3c5..fb540f7 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -41,12 +41,12 @@
   val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
 
   private val metricsConfig = new MetricsConfig(config)
-  val containersFromPreviousAttempts = newCounter("container-from-previous-attempt")
+  val containersFromPreviousAttempts = newGauge("container-from-previous-attempt", 0L)
   val reporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, SamzaAppMasterMetrics.sourceName).asScala
   reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry))
 
   def setContainersFromPreviousAttempts(containerCount: Int) {
-    containersFromPreviousAttempts.inc(containerCount)
+    containersFromPreviousAttempts.set(containerCount)
   }
 
   def start() {
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 7e4565b..237667d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -19,14 +19,11 @@
 
 package org.apache.samza.job.yarn
 
-import java.lang.Boolean
-
 import com.google.common.annotations.VisibleForTesting
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.samza.SamzaException
-import org.apache.samza.classloader.DependencyIsolationUtils
 import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfig}
 import org.apache.samza.job.ApplicationStatus.{SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, StreamJob}
@@ -46,7 +43,7 @@
   def submit: YarnJob = {
     try {
       val jobConfig = new JobConfig(config)
-      val cmdExec = YarnJob.buildJobCoordinatorCmd(config, jobConfig)
+      val cmdExec = "./__package/bin/run-jc.sh"
       val environment = YarnJob.buildEnvironment(config, this.yarnConfig, jobConfig)
 
       appId = client.submitApplication(
@@ -184,13 +181,6 @@
         Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
     }
     envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts)
-    val splitDeploymentEnabled = jobConfig.isSplitDeploymentEnabled
-    envMapBuilder += ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED -> Util.envVarEscape(Boolean.toString(splitDeploymentEnabled))
-    if (splitDeploymentEnabled) {
-      //split deployment is enabled, so need to specify where the application lib directory is for app resources
-      envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
-        Util.envVarEscape(String.format("./%s/lib", DependencyIsolationUtils.APPLICATION_DIRECTORY))
-    }
     Option.apply(yarnConfig.getAMJavaHome).foreach {
       amJavaHome => envMapBuilder += ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome
     }
@@ -198,18 +188,4 @@
       Util.envVarEscape(config.get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, ""))
     envMapBuilder.result()
   }
-
-  /**
-    * Build the command for the job coordinator execution.
-    * Passing multiple separate config objects so that they can be reused in other places.
-    */
-  @VisibleForTesting
-  private[yarn] def buildJobCoordinatorCmd(config: Config, jobConfig: JobConfig): String = {
-    var cmdExec = "./__package/bin/run-jc.sh" // default location
-    if (jobConfig.isSplitDeploymentEnabled) {
-      cmdExec = "./%s/bin/run-jc.sh" format DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY
-      logger.info("Using isolated cluster-based job coordinator path: %s" format cmdExec)
-    }
-    cmdExec
-  }
-}
\ No newline at end of file
+}
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index f068800..4e2c4a7 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.util.Map;
 import com.google.common.collect.ImmutableMap;
-import org.apache.samza.classloader.DependencyIsolationUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -38,19 +37,6 @@
 
 public class TestYarnJob {
   @Test
-  public void testBuildJobCoordinatorCmd() {
-    // cluster-based job coordinator dependency isolation is not enabled; use script from __package directory
-    Config config = new MapConfig();
-    assertEquals("./__package/bin/run-jc.sh", YarnJob$.MODULE$.buildJobCoordinatorCmd(config, new JobConfig(config)));
-
-    // split deployment is enabled; use script from framework infrastructure directory
-    Config splitDeploymentEnabled =
-        new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
-    assertEquals(String.format("./%s/bin/run-jc.sh", DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY),
-        YarnJob$.MODULE$.buildJobCoordinatorCmd(splitDeploymentEnabled, new JobConfig(splitDeploymentEnabled)));
-  }
-
-  @Test
   public void testBuildEnvironment() throws IOException {
     String amJvmOptions = "-Xmx1g -Dconfig.key='config value'";
     Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
@@ -58,35 +44,12 @@
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, amJvmOptions) // needs escaping
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
         .build());
     String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         .writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
-        ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
-    assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
-        YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
-  }
-
-  @Test
-  public void testBuildEnvironmentJobCoordinatorDependencyIsolationEnabled() throws IOException {
-    Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
-        .put(JobConfig.JOB_NAME, "jobName")
-        .put(JobConfig.JOB_ID, "jobId")
-        .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
-        .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
-        .build());
-    String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
-        .writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
-    Map<String, String> expected = ImmutableMap.of(
-        ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
-        ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -99,7 +62,6 @@
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
         .put(YarnConfig.AM_JAVA_HOME, "/some/path/to/java/home")
         .build());
     String expectedCoordinatorStreamConfigStringValue = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -107,7 +69,6 @@
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
         ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
@@ -121,15 +82,12 @@
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
         .build());
     String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         .writeValueAsString(config));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());
@@ -142,7 +100,6 @@
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
         .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*")
         .build());
     String expectedSubmissionConfig = Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -150,8 +107,6 @@
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
-        ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib",
         ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "./sqlapp/lib/*");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava());