Merge branch 'NewConsumer2' of https://github.com/sborya/samza into NewKafkaSystemConsumer
diff --git a/.travis.yml b/.travis.yml
index 2a3ae0c..601ceac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -30,8 +30,8 @@
   - oraclejdk8
 
 script:
-  ## travis_wait increases build idle-wait time from 10 minutes to 20 minutes.
-  - travis_wait 20 ./gradlew clean build
+  ## travis_wait increases build idle-wait time from 10 minutes to 30 minutes.
+  - travis_wait 30 ./gradlew clean build
   - type sonar-scanner &>/dev/null; if [ $? -eq 0 ]; then sonar-scanner; else echo "Not running sonar"; fi
 
 before_cache:
diff --git a/build.gradle b/build.gradle
index 7d23717..8b68205 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,6 +184,7 @@
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "net.jodah:failsafe:$failsafeVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 3c4c3a9..b33ab82 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -45,4 +45,5 @@
   yarnVersion = "2.6.1"
   zkClientVersion = "0.8"
   zookeeperVersion = "3.4.6"
+  failsafeVersion = "1.1.0"
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index c847088..cb5d5c0 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -56,7 +56,6 @@
    */
   public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
   public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled";
-  private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
 
   /**
    * Number of CPU cores to request from the cluster manager per container
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index ea892fe..810f424 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -21,6 +21,7 @@
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,7 +39,6 @@
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,14 +54,16 @@
 public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
-  static final int MAX_INFERRED_PARTITIONS = 256;
+  /* package private */ static final int MAX_INFERRED_PARTITIONS = 256;
 
   private final Config config;
+  private final StreamConfig streamConfig;
   private final StreamManager streamManager;
 
   public ExecutionPlanner(Config config, StreamManager streamManager) {
     this.config = config;
     this.streamManager = streamManager;
+    this.streamConfig = new StreamConfig(config);
   }
 
   public ExecutionPlan plan(OperatorSpecGraph specGraph) {
@@ -71,12 +73,10 @@
     JobGraph jobGraph = createJobGraph(specGraph);
 
     // fetch the external streams partition info
-    updateExistingPartitions(jobGraph, streamManager);
+    fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
 
-    if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
-      // figure out the partitions for internal streams
-      calculatePartitions(jobGraph);
-    }
+    // figure out the partitions for internal streams
+    calculatePartitions(jobGraph);
 
     return jobGraph;
   }
@@ -85,9 +85,9 @@
     ApplicationConfig appConfig = new ApplicationConfig(config);
     ClusterManagerConfig clusterConfig = new ClusterManagerConfig(config);
     // currently we don't support host-affinity in batch mode
-    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH
-        && clusterConfig.getHostAffinityEnabled()) {
-      throw new SamzaException("Host affinity is not supported in batch mode. Please configure job.host-affinity.enabled=false.");
+    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && clusterConfig.getHostAffinityEnabled()) {
+      throw new SamzaException(String.format("Host affinity is not supported in batch mode. Please configure %s=false.",
+          ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED));
     }
   }
 
@@ -96,30 +96,33 @@
    */
   /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
     JobGraph jobGraph = new JobGraph(config, specGraph);
-    StreamConfig streamConfig = new StreamConfig(config);
+
+    // Source streams contain both input and intermediate streams.
     Set<StreamSpec> sourceStreams = getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig);
+    // Sink streams contain both output and intermediate streams.
     Set<StreamSpec> sinkStreams = getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig);
-    Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
-    Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
-    intStreams.retainAll(sinkStreams);
-    sourceStreams.removeAll(intStreams);
-    sinkStreams.removeAll(intStreams);
+
+    Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, sinkStreams);
+    Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams);
+    Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams);
+
+    Set<TableSpec> tables = specGraph.getTables().keySet();
 
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
     String jobId = config.get(JobConfig.JOB_ID(), "1");
     JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId);
 
-    // add sources
-    sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
+    // Add input streams
+    inputStreams.forEach(spec -> jobGraph.addInputStream(spec, node));
 
-    // add sinks
-    sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
+    // Add output streams
+    outputStreams.forEach(spec -> jobGraph.addOutputStream(spec, node));
 
-    // add intermediate streams
-    intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
+    // Add intermediate streams
+    intermediateStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
 
-    // add tables
+    // Add tables
     tables.forEach(spec -> jobGraph.addTable(spec, node));
 
     jobGraph.validate();
@@ -132,71 +135,80 @@
    */
   /* package private */ void calculatePartitions(JobGraph jobGraph) {
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(jobGraph, config);
+    calculateJoinInputPartitions(jobGraph, streamConfig);
 
     // calculate the partitions for the rest of intermediate streams
-    calculateIntStreamPartitions(jobGraph, config);
+    calculateIntermediateStreamPartitions(jobGraph, config);
 
     // validate all the partitions are assigned
-    validatePartitions(jobGraph);
+    validateIntermediateStreamPartitions(jobGraph);
   }
 
   /**
-   * Fetch the partitions of source/sink streams and update the StreamEdges.
+   * Fetch the partitions of input/output streams and update the corresponding StreamEdges.
    * @param jobGraph {@link JobGraph}
    * @param streamManager the {@link StreamManager} to interface with the streams.
    */
-  /* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
+  /* package private */ static void fetchInputAndOutputStreamPartitions(JobGraph jobGraph, StreamManager streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
-    existingStreams.addAll(jobGraph.getSources());
-    existingStreams.addAll(jobGraph.getSinks());
+    existingStreams.addAll(jobGraph.getInputStreams());
+    existingStreams.addAll(jobGraph.getOutputStreams());
 
+    // System to StreamEdges
     Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
-    // group the StreamEdge(s) based on the system name
-    existingStreams.forEach(streamEdge -> {
-        SystemStream systemStream = streamEdge.getSystemStream();
-        systemToStreamEdges.put(systemStream.getSystem(), streamEdge);
-      });
-    for (Map.Entry<String, Collection<StreamEdge>> entry : systemToStreamEdges.asMap().entrySet()) {
-      String systemName = entry.getKey();
-      Collection<StreamEdge> streamEdges = entry.getValue();
+
+    // Group StreamEdges by system
+    for (StreamEdge streamEdge : existingStreams) {
+      String system = streamEdge.getSystemStream().getSystem();
+      systemToStreamEdges.put(system, streamEdge);
+    }
+
+    // Fetch partition count for every set of StreamEdges belonging to a particular system.
+    for (String system : systemToStreamEdges.keySet()) {
+      Collection<StreamEdge> streamEdges = systemToStreamEdges.get(system);
+
+      // Map every stream to its corresponding StreamEdge so we can retrieve a StreamEdge given its stream.
       Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
-      // create the stream name to StreamEdge mapping for this system
-      streamEdges.forEach(streamEdge -> streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
-      // retrieve the partition counts for the streams in this system
-      Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(systemName, streamToStreamEdge.keySet());
-      // set the partitions of a stream to its StreamEdge
-      streamToPartitionCount.forEach((stream, partitionCount) -> {
-          streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
-          log.info("Partition count is {} for stream {}", partitionCount, stream);
-        });
+      for (StreamEdge streamEdge : streamEdges) {
+        streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge);
+      }
+
+      // Retrieve partition count for every set of streams.
+      Set<String> streams = streamToStreamEdge.keySet();
+      Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(system, streams);
+
+      // Retrieve StreamEdge corresponding to every stream and set partition count on it.
+      for (Map.Entry<String, Integer> entry : streamToPartitionCount.entrySet()) {
+        String stream = entry.getKey();
+        Integer partitionCount = entry.getValue();
+        streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
+        log.info("Fetched partition count value {} for stream {}", partitionCount, stream);
+      }
     }
   }
 
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, Config config) {
+  /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, StreamConfig streamConfig) {
     // mapping from a source stream to all join specs reachable from it
-    Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
+    Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
-    Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
+    Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
     // A queue of joins with known input partitions
-    Queue<OperatorSpec> joinQ = new LinkedList<>();
+    Queue<JoinOperatorSpec> joinQ = new LinkedList<>();
     // The visited set keeps track of the join specs that have been already inserted in the queue before
-    Set<OperatorSpec> visited = new HashSet<>();
+    Set<JoinOperatorSpec> visited = new HashSet<>();
 
-    StreamConfig streamConfig = new StreamConfig(config);
-
-    jobGraph.getSpecGraph().getInputOperators().forEach((key, value) -> {
-        StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(key, streamConfig));
+    jobGraph.getSpecGraph().getInputOperators().forEach((streamId, inputOperatorSpec) -> {
+        StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(streamId, streamConfig));
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
-        findReachableJoins(value, streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
+        findReachableJoins(inputOperatorSpec, streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
       });
 
     // At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
     while (!joinQ.isEmpty()) {
-      OperatorSpec join = joinQ.poll();
+      JoinOperatorSpec join = joinQ.poll();
       int partitions = StreamEdge.PARTITIONS_UNKNOWN;
       // loop through the input streams to the join and find the partition count
       for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
@@ -223,7 +235,7 @@
           edge.setPartitionCount(partitions);
 
           // find other joins can be inferred by setting this edge
-          for (OperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
+          for (JoinOperatorSpec op : streamEdgeToJoinSpecs.get(edge)) {
             if (!visited.contains(op)) {
               joinQ.add(op);
               visited.add(op);
@@ -244,17 +256,19 @@
    * @param joinQ queue that contains joinSpecs where at least one of the input stream edge partitions is known.
    */
   private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge sourceStreamEdge,
-      Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges,
-      Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
-      Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
-    if (operatorSpec instanceof JoinOperatorSpec) {
-      joinSpecToStreamEdges.put(operatorSpec, sourceStreamEdge);
-      streamEdgeToJoinSpecs.put(sourceStreamEdge, operatorSpec);
+      Multimap<JoinOperatorSpec, StreamEdge> joinSpecToStreamEdges,
+      Multimap<StreamEdge, JoinOperatorSpec> streamEdgeToJoinSpecs,
+      Queue<JoinOperatorSpec> joinQ, Set<JoinOperatorSpec> visited) {
 
-      if (!visited.contains(operatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
+    if (operatorSpec instanceof JoinOperatorSpec) {
+      JoinOperatorSpec joinOperatorSpec = (JoinOperatorSpec) operatorSpec;
+      joinSpecToStreamEdges.put(joinOperatorSpec, sourceStreamEdge);
+      streamEdgeToJoinSpecs.put(sourceStreamEdge, joinOperatorSpec);
+
+      if (!visited.contains(joinOperatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
         // put the joins with known input partitions into the queue and mark as visited
-        joinQ.add(operatorSpec);
-        visited.add(operatorSpec);
+        joinQ.add(joinOperatorSpec);
+        visited.add(joinOperatorSpec);
       }
     }
 
@@ -265,15 +279,16 @@
     }
   }
 
-  private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
-    int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
-    if (partitions < 0) {
+  private static void calculateIntermediateStreamPartitions(JobGraph jobGraph, Config config) {
+    final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS();
+    int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN);
+    if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
       // partition will be further bounded by MAX_INFERRED_PARTITIONS.
       // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
-      int maxInPartitions = maxPartition(jobGraph.getSources());
-      int maxOutPartitions = maxPartition(jobGraph.getSinks());
+      int maxInPartitions = maxPartitions(jobGraph.getInputStreams());
+      int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams());
       partitions = Math.max(maxInPartitions, maxOutPartitions);
 
       if (partitions > MAX_INFERRED_PARTITIONS) {
@@ -281,7 +296,17 @@
         log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
             partitions, MAX_INFERRED_PARTITIONS));
       }
+    } else {
+      // Reject any zero or other negative values explicitly specified in config.
+      if (partitions <= 0) {
+        throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions,
+            defaultPartitionsConfigProperty));
+      }
+
+      log.info("Using partition count value {} specified for config property {}", partitions,
+          defaultPartitionsConfigProperty);
     }
+
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
         log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
@@ -290,16 +315,15 @@
     }
   }
 
-  private static void validatePartitions(JobGraph jobGraph) {
+  private static void validateIntermediateStreamPartitions(JobGraph jobGraph) {
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
-        throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getName()));
+        throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName()));
       }
     }
   }
 
-  /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
-    return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
+  /* package private */ static int maxPartitions(Collection<StreamEdge> edges) {
+    return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
-
 }
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index f49e6db..5b19095 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -54,8 +54,8 @@
 
   private final Map<String, JobNode> nodes = new HashMap<>();
   private final Map<String, StreamEdge> edges = new HashMap<>();
-  private final Set<StreamEdge> sources = new HashSet<>();
-  private final Set<StreamEdge> sinks = new HashSet<>();
+  private final Set<StreamEdge> inputStreams = new HashSet<>();
+  private final Set<StreamEdge> outputStreams = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
   private final Set<TableSpec> tables = new HashSet<>();
   private final Config config;
@@ -115,26 +115,26 @@
 
   /**
    * Add a source stream to a {@link JobNode}
-   * @param input source stream
-   * @param node the job node that consumes from the source
+   * @param streamSpec input stream
+   * @param node the job node that consumes from the streamSpec
    */
-  void addSource(StreamSpec input, JobNode node) {
-    StreamEdge edge = getOrCreateStreamEdge(input);
+  void addInputStream(StreamSpec streamSpec, JobNode node) {
+    StreamEdge edge = getOrCreateStreamEdge(streamSpec);
     edge.addTargetNode(node);
     node.addInEdge(edge);
-    sources.add(edge);
+    inputStreams.add(edge);
   }
 
   /**
-   * Add a sink stream to a {@link JobNode}
-   * @param output sink stream
-   * @param node the job node that outputs to the sink
+   * Add an output stream to a {@link JobNode}
+   * @param streamSpec output stream
+   * @param node the job node that outputs to the output stream
    */
-  void addSink(StreamSpec output, JobNode node) {
-    StreamEdge edge = getOrCreateStreamEdge(output);
+  void addOutputStream(StreamSpec streamSpec, JobNode node) {
+    StreamEdge edge = getOrCreateStreamEdge(streamSpec);
     edge.addSourceNode(node);
     node.addOutEdge(edge);
-    sinks.add(edge);
+    outputStreams.add(edge);
   }
 
   /**
@@ -204,19 +204,19 @@
   }
 
   /**
-   * Returns the source streams in the graph
+   * Returns the input streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  Set<StreamEdge> getSources() {
-    return Collections.unmodifiableSet(sources);
+  Set<StreamEdge> getInputStreams() {
+    return Collections.unmodifiableSet(inputStreams);
   }
 
   /**
-   * Return the sink streams in the graph
+   * Return the output streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  Set<StreamEdge> getSinks() {
-    return Collections.unmodifiableSet(sinks);
+  Set<StreamEdge> getOutputStreams() {
+    return Collections.unmodifiableSet(outputStreams);
   }
 
   /**
@@ -236,22 +236,22 @@
   }
 
   /**
-   * Validate the graph has the correct topology, meaning the sources are coming from external streams,
-   * sinks are going to external streams, and the nodes are connected with intermediate streams.
-   * Also validate all the nodes are reachable from the sources.
+   * Validate the graph has the correct topology, meaning the input streams are coming from external streams,
+   * output streams are going to external streams, and the nodes are connected with intermediate streams.
+   * Also validate all the nodes are reachable from the input streams.
    */
   void validate() {
-    validateSources();
-    validateSinks();
+    validateInputStreams();
+    validateOutputStreams();
     validateInternalStreams();
     validateReachability();
   }
 
   /**
-   * Validate the sources should have indegree being 0 and outdegree greater than 0
+   * Validate the input streams should have indegree being 0 and outdegree greater than 0
    */
-  private void validateSources() {
-    sources.forEach(edge -> {
+  private void validateInputStreams() {
+    inputStreams.forEach(edge -> {
         if (!edge.getSourceNodes().isEmpty()) {
           throw new IllegalArgumentException(
               String.format("Source stream %s should not have producers.", edge.getName()));
@@ -264,10 +264,10 @@
   }
 
   /**
-   * Validate the sinks should have outdegree being 0 and indegree greater than 0
+   * Validate the output streams should have outdegree being 0 and indegree greater than 0
    */
-  private void validateSinks() {
-    sinks.forEach(edge -> {
+  private void validateOutputStreams() {
+    outputStreams.forEach(edge -> {
         if (!edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
               String.format("Sink stream %s should not have consumers", edge.getName()));
@@ -284,8 +284,8 @@
    */
   private void validateInternalStreams() {
     Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
-    internalEdges.removeAll(sources);
-    internalEdges.removeAll(sinks);
+    internalEdges.removeAll(inputStreams);
+    internalEdges.removeAll(outputStreams);
 
     internalEdges.forEach(edge -> {
         if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
@@ -296,10 +296,10 @@
   }
 
   /**
-   * Validate all nodes are reachable by sources.
+   * Validate all nodes are reachable by input streams.
    */
   private void validateReachability() {
-    // validate all nodes are reachable from the sources
+    // validate all nodes are reachable from the input streams
     final Set<JobNode> reachable = findReachable();
     if (reachable.size() != nodes.size()) {
       Set<JobNode> unreachable = new HashSet<>(nodes.values());
@@ -317,8 +317,8 @@
     Queue<JobNode> queue = new ArrayDeque<>();
     Set<JobNode> visited = new HashSet<>();
 
-    sources.forEach(source -> {
-        List<JobNode> next = source.getTargetNodes();
+    inputStreams.forEach(input -> {
+        List<JobNode> next = input.getTargetNodes();
         queue.addAll(next);
         visited.addAll(next);
       });
@@ -353,11 +353,11 @@
     pnodes.forEach(node -> {
         String nid = node.getId();
         //only count the degrees of intermediate streams
-        long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+        long degree = node.getInEdges().stream().filter(e -> !inputStreams.contains(e)).count();
         indegree.put(nid, degree);
 
         if (degree == 0L) {
-          // start from the nodes that has no intermediate input streams, so it only consumes from sources
+          // start from the nodes that has no intermediate input streams, so it only consumes from input streams
           q.add(node);
           visited.add(node);
         }
@@ -410,9 +410,9 @@
           q.add(minNode);
           visited.add(minNode);
         } else {
-          // all the remaining nodes should be reachable from sources
-          // start from sources again to find the next node that hasn't been visited
-          JobNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+          // all the remaining nodes should be reachable from input streams
+          // start from input streams again to find the next node that hasn't been visited
+          JobNode nextNode = inputStreams.stream().flatMap(input -> input.getTargetNodes().stream())
               .filter(node -> !visited.contains(node))
               .findAny().get();
           q.add(nextNode);
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 3a8d5c9..91453d2 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -134,8 +134,8 @@
     jobGraphJson.sinkStreams = new HashMap<>();
     jobGraphJson.intermediateStreams = new HashMap<>();
     jobGraphJson.tables = new HashMap<>();
-    jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
-    jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
+    jobGraph.getInputStreams().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
+    jobGraph.getOutputStreams().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
     jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams));
     jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
 
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index a9f744c..47705ee 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -65,7 +65,6 @@
  */
 public class JobNode {
   private static final Logger log = LoggerFactory.getLogger(JobNode.class);
-  private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
   private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
 
   private final String jobName;
@@ -87,7 +86,7 @@
 
   public static Config mergeJobConfig(Config fullConfig, Config generatedConfig) {
     return new JobConfig(Util.rewriteConfig(extractScopedConfig(
-        fullConfig, generatedConfig, String.format(CONFIG_JOB_PREFIX, new JobConfig(fullConfig).getName().get()))));
+        fullConfig, generatedConfig, String.format(JobConfig.CONFIG_JOB_PREFIX(), new JobConfig(fullConfig).getName().get()))));
   }
 
   public OperatorSpecGraph getSpecGraph() {
@@ -203,7 +202,7 @@
 
     log.info("Job {} has generated configs {}", jobName, configs);
 
-    String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
+    String configPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), jobName);
 
     // Disallow user specified job inputs/outputs. This info comes strictly from the user application.
     Map<String, String> allowedConfigs = new HashMap<>(config);
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 4ef9f9c..7910216 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -428,7 +428,7 @@
     public void afterFailure(Throwable t) {
       containerShutdownLatch.countDown();
       synchronized (lock) {
-        LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
+        LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t);
         state = State.STOPPING;
         containerException = t;
         jobCoordinator.stop();
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index 186b4a8..ae72414 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -37,12 +37,14 @@
 
 /**
  * A {@link TableManager} manages tables within a Samza task. For each table, it maintains
- * the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
- * {@link org.apache.samza.container.TaskInstance} to retrieve table instances for
- * read/write operations.
+ * the {@link TableSpec}, the {@link TableProvider} and the {@link Table} instance.
+ * It is used at execution for {@link org.apache.samza.container.TaskInstance} to retrieve
+ * table instances for read/write operations.
  *
  * A {@link TableManager} is constructed from job configuration, the {@link TableSpec}
- * and {@link TableProvider} are constructed by processing the job configuration.
+ * and {@link TableProvider} are constructed by processing the job configuration
+ * during initialization. The {@link Table} is constructed when {@link #getTable(String)}
+ * is called and cached.
  *
  * After a {@link TableManager} is constructed, local tables are associated with
  * local store instances created during {@link org.apache.samza.container.SamzaContainer}
@@ -51,19 +53,19 @@
  * Method {@link TableManager#getTable(String)} will throw {@link IllegalStateException},
  * if it's called before initialization.
  *
- * For store backed tables, the list of stores must be injected into the constructor.
  */
 public class TableManager {
 
   static public class TableCtx {
     private TableSpec tableSpec;
     private TableProvider tableProvider;
+    private Table table;
   }
 
   private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
 
   // tableId -> TableCtx
-  private final Map<String, TableCtx> tables = new HashMap<>();
+  private final Map<String, TableCtx> tableContexts = new HashMap<>();
 
   private boolean initialized;
 
@@ -100,7 +102,7 @@
    */
   public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
     Preconditions.checkNotNull(containerContext, "null container context.");
-    tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
+    tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
     initialized = true;
   }
 
@@ -109,7 +111,7 @@
    * @param tableSpec the table spec
    */
   private void addTable(TableSpec tableSpec) {
-    if (tables.containsKey(tableSpec.getId())) {
+    if (tableContexts.containsKey(tableSpec.getId())) {
       throw new SamzaException("Table " + tableSpec.getId() + " already exists");
     }
     TableCtx ctx = new TableCtx();
@@ -117,14 +119,14 @@
         Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
     ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
     ctx.tableSpec = tableSpec;
-    tables.put(tableSpec.getId(), ctx);
+    tableContexts.put(tableSpec.getId(), ctx);
   }
 
   /**
    * Shutdown the table manager, internally it shuts down all tables
    */
   public void close() {
-    tables.values().forEach(ctx -> ctx.tableProvider.close());
+    tableContexts.values().forEach(ctx -> ctx.tableProvider.close());
   }
 
   /**
@@ -133,10 +135,14 @@
    * @return table instance
    */
   public Table getTable(String tableId) {
-    if (!initialized) {
-      throw new IllegalStateException("TableManager has not been initialized.");
+    Preconditions.checkState(initialized, "TableManager has not been initialized.");
+
+    TableCtx ctx = tableContexts.get(tableId);
+    Preconditions.checkNotNull(ctx, "Unknown tableId " + tableId);
+
+    if (ctx.table == null) {
+      ctx.table = ctx.tableProvider.getTable();
     }
-    Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId);
-    return tables.get(tableId).tableProvider.getTable();
+    return ctx.table;
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 88bc7df..9ef4c1b 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -43,11 +43,11 @@
  * @param <V> the type of the value in this table
  */
 public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
-  private final TableWriteFunction<K, V> writeFn;
 
   private DefaultTableWriteMetrics writeMetrics;
 
   @VisibleForTesting
+  final TableWriteFunction<K, V> writeFn;
   final TableRateLimiter writeRateLimiter;
 
   public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index 3186fee..b3d82f3 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -80,10 +80,10 @@
   protected final ExecutorService callbackExecutor;
   protected final ExecutorService tableExecutor;
 
-  private final TableReadFunction<K, V> readFn;
   private DefaultTableReadMetrics readMetrics;
 
   @VisibleForTesting
+  final TableReadFunction<K, V> readFn;
   final TableRateLimiter<K, V> readRateLimiter;
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
index a8d419d..537ff87 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -24,6 +24,7 @@
 
 import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.utils.SerdeUtils;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
@@ -70,6 +71,9 @@
   private TableRateLimiter.CreditFunction<K, V> readCreditFn;
   private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
 
+  private TableRetryPolicy readRetryPolicy;
+  private TableRetryPolicy writeRetryPolicy;
+
   // By default execute future callbacks on the native client threads
   // ie. no additional thread pool for callbacks.
   private int asyncCallbackPoolSize = -1;
@@ -115,13 +119,23 @@
           "write credit function", writeCreditFn));
     }
 
+    if (readRetryPolicy != null) {
+      tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
+          "read retry policy", readRetryPolicy));
+    }
+
+    if (writeRetryPolicy != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
+          "write retry policy", writeRetryPolicy));
+    }
+
     tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
 
     return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
   }
 
   /**
-   * Use specified TableReadFunction with remote table.
+   * Use specified TableReadFunction with remote table and a retry policy.
    * @param readFn read function instance
    * @return this table descriptor instance
    */
@@ -132,7 +146,7 @@
   }
 
   /**
-   * Use specified TableWriteFunction with remote table.
+   * Use specified TableWriteFunction with remote table and a retry policy.
    * @param writeFn write function instance
    * @return this table descriptor instance
    */
@@ -143,6 +157,34 @@
   }
 
   /**
+   * Use specified TableReadFunction with remote table.
+   * @param readFn read function instance
+   * @param retryPolicy retry policy for the read function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.readFn = readFn;
+    this.readRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table.
+   * @param writeFn write function instance
+   * @param retryPolicy retry policy for the write function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.writeFn = writeFn;
+    this.writeRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
    * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
    * of credits to be charged from the rate limiter for table read and write operations.
    * This is an advanced API that provides greater flexibility to throttle each record in the table
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 6c5d9b3..cae0bbd 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -25,11 +25,16 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.utils.BaseTableProvider;
 import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.util.RateLimiter;
 
 import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
@@ -47,6 +52,8 @@
   static final String READ_CREDIT_FN = "io.read.credit.func";
   static final String WRITE_CREDIT_FN = "io.write.credit.func";
   static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+  static final String READ_RETRY_POLICY = "io.read.retry.policy";
+  static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
 
   private final boolean readOnly;
   private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
@@ -58,6 +65,7 @@
    */
   private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
   private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+  private static ScheduledExecutorService retryExecutor;
 
   public RemoteTableProvider(TableSpec tableSpec) {
     super(tableSpec);
@@ -72,7 +80,7 @@
     RemoteReadableTable table;
     String tableId = tableSpec.getId();
 
-    TableReadFunction<?, ?> readFn = getReadFn();
+    TableReadFunction readFn = getReadFn();
     RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
     if (rateLimiter != null) {
       rateLimiter.init(containerContext.config, taskContext);
@@ -83,11 +91,33 @@
     TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
     TableRateLimiter writeRateLimiter = null;
 
+    TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
+    TableRetryPolicy writeRetryPolicy = null;
+
+    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
+      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setName("table-retry-executor");
+          thread.setDaemon(true);
+          return thread;
+        });
+    }
+
+    if (readRetryPolicy != null) {
+      readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
+    }
+
+    TableWriteFunction writeFn = getWriteFn();
+
     boolean isRateLimited = readRateLimiter.isRateLimited();
     if (!readOnly) {
       writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
       writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG);
       isRateLimited |= writeRateLimiter.isRateLimited();
+      writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
+      if (writeRetryPolicy != null) {
+        writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
+      }
     }
 
     // Optional executor for future callback/completion. Shared by both read and write operations.
@@ -116,10 +146,18 @@
       table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
           tableExecutors.get(tableId), callbackExecutors.get(tableId));
     } else {
-      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), readRateLimiter,
+      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
           writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
     }
 
+    TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+    if (readRetryPolicy != null) {
+      ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
+    }
+    if (writeRetryPolicy != null) {
+      ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
+    }
+
     table.init(containerContext, taskContext);
     tables.add(table);
     return table;
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
index 5d0f963..4791779 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -100,5 +100,12 @@
             .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join())));
   }
 
+  /**
+   * Determine whether the current operation can be retried with the last thrown exception.
+   * @param exception exception thrown by a table operation
+   * @return whether the operation can be retried
+   */
+  boolean isRetriable(Throwable exception);
+
   // optionally implement readObject() to initialize transient states
 }
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
index 0ac3a0c..d9d619f 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -143,6 +143,13 @@
   }
 
   /**
+   * Determine whether the current operation can be retried with the last thrown exception.
+   * @param exception exception thrown by a table operation
+   * @return whether the operation can be retried
+   */
+  boolean isRetriable(Throwable exception);
+
+  /**
    * Flush the remote store (optional)
    */
   default void flush() {
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
new file mode 100644
index 0000000..b2eccd8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.SamzaException;
+
+import net.jodah.failsafe.AsyncFailsafe;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+
+
+/**
+ * Helper class adapting the generic {@link TableRetryPolicy} to a failsafe {@link RetryPolicy} and
+ * creating failsafe retryer instances with proper metrics management.
+ */
+class FailsafeAdapter {
+  /**
+   * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}.
+   * @return this policy instance
+   */
+  static RetryPolicy valueOf(TableRetryPolicy policy) {
+    RetryPolicy failSafePolicy = new RetryPolicy();
+
+    switch (policy.getBackoffType()) {
+      case NONE:
+        break;
+
+      case FIXED:
+        failSafePolicy.withDelay(policy.getSleepTime().toMillis(), TimeUnit.MILLISECONDS);
+        break;
+
+      case RANDOM:
+        failSafePolicy.withDelay(policy.getRandomMin().toMillis(), policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS);
+        break;
+
+      case EXPONENTIAL:
+        failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS,
+            policy.getExponentialFactor());
+        break;
+
+      default:
+        throw new SamzaException("Unknown retry policy type.");
+    }
+
+    if (policy.getMaxDuration() != null) {
+      failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), TimeUnit.MILLISECONDS);
+    }
+    if (policy.getMaxAttempts() != null) {
+      failSafePolicy.withMaxRetries(policy.getMaxAttempts());
+    }
+    if (policy.getJitter() != null && policy.getBackoffType() != TableRetryPolicy.BackoffType.RANDOM) {
+      failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e));
+
+    return failSafePolicy;
+  }
+
+  /**
+   * Obtain an async failsafe retryer instance with the specified policy, metrics, and executor service.
+   * @param retryPolicy retry policy
+   * @param metrics retry metrics
+   * @param retryExec executor service for scheduling async retries
+   * @return {@link net.jodah.failsafe.AsyncFailsafe} instance
+   */
+  static AsyncFailsafe<?> failsafe(RetryPolicy retryPolicy, RetryMetrics metrics, ScheduledExecutorService retryExec) {
+    long startMs = System.currentTimeMillis();
+    return Failsafe.with(retryPolicy).with(retryExec)
+        .onRetry(e -> metrics.retryCount.inc())
+        .onRetriesExceeded(e -> {
+            metrics.retryTimer.update(System.currentTimeMillis() - startMs);
+            metrics.permFailureCount.inc();
+          })
+        .onSuccess((e, ctx) -> {
+            if (ctx.getExecutions() > 1) {
+              metrics.retryTimer.update(System.currentTimeMillis() - startMs);
+            } else {
+              metrics.successCount.inc();
+            }
+          });
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
new file mode 100644
index 0000000..1adddc0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import net.jodah.failsafe.RetryPolicy;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+
+/**
+ * Wrapper for a {@link TableReadFunction} instance to add common retry
+ * support with a {@link TableRetryPolicy}. This wrapper is created by
+ * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * policy is specified together with the {@link TableReadFunction}.
+ *
+ * Actual retry mechanism is provided by the failsafe library. Retry is
+ * attempted in an async way with a {@link ScheduledExecutorService}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RetriableReadFunction<K, V> implements TableReadFunction<K, V> {
+  private final RetryPolicy retryPolicy;
+  private final TableReadFunction<K, V> readFn;
+  private final ScheduledExecutorService retryExecutor;
+
+  @VisibleForTesting
+  RetryMetrics retryMetrics;
+
+  public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction<K, V> readFn,
+      ScheduledExecutorService retryExecutor) {
+    Preconditions.checkNotNull(policy);
+    Preconditions.checkNotNull(readFn);
+    Preconditions.checkNotNull(retryExecutor);
+
+    this.readFn = readFn;
+    this.retryExecutor = retryExecutor;
+    Predicate<Throwable> retryPredicate = policy.getRetryPredicate();
+    policy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || retryPredicate.test(ex));
+    this.retryPolicy = FailsafeAdapter.valueOf(policy);
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> readFn.getAsync(key))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to get the record for " + key + " after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> readFn.getAllAsync(keys))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to get the records for " + keys + " after retries.", e);
+          });
+  }
+
+  @Override
+  public boolean isRetriable(Throwable exception) {
+    return readFn.isRetriable(exception);
+  }
+
+  /**
+   * Initialize retry-related metrics
+   * @param metricsUtil metrics util
+   */
+  public void setMetrics(TableMetricsUtil metricsUtil) {
+    this.retryMetrics = new RetryMetrics("reader", metricsUtil);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
new file mode 100644
index 0000000..2f3f062
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import net.jodah.failsafe.RetryPolicy;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+
+/**
+ * Wrapper for a {@link TableWriteFunction} instance to add common retry
+ * support with a {@link TableRetryPolicy}. This wrapper is created by
+ * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * policy is specified together with the {@link TableWriteFunction}.
+ *
+ * Actual retry mechanism is provided by the failsafe library. Retry is
+ * attempted in an async way with a {@link ScheduledExecutorService}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RetriableWriteFunction<K, V> implements TableWriteFunction<K, V> {
+  private final RetryPolicy retryPolicy;
+  private final TableWriteFunction<K, V> writeFn;
+  private final ScheduledExecutorService retryExecutor;
+
+  @VisibleForTesting
+  RetryMetrics retryMetrics;
+
+  public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction<K, V> writeFn,
+      ScheduledExecutorService retryExecutor)  {
+    Preconditions.checkNotNull(policy);
+    Preconditions.checkNotNull(writeFn);
+    Preconditions.checkNotNull(retryExecutor);
+
+    this.writeFn = writeFn;
+    this.retryExecutor = retryExecutor;
+    Predicate<Throwable> retryPredicate = policy.getRetryPredicate();
+    policy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || retryPredicate.test(ex));
+    this.retryPolicy = FailsafeAdapter.valueOf(policy);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V record) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.putAsync(key, record))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to get the record for " + key + " after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.putAllAsync(records))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to put records after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.deleteAsync(key))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to delete the record for " + key + " after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.deleteAllAsync(keys))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e);
+          });
+  }
+
+  @Override
+  public boolean isRetriable(Throwable exception) {
+    return writeFn.isRetriable(exception);
+  }
+
+  /**
+   * Initialize retry-related metrics.
+   * @param metricsUtil metrics util
+   */
+  public void setMetrics(TableMetricsUtil metricsUtil) {
+    this.retryMetrics = new RetryMetrics("writer", metricsUtil);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
new file mode 100644
index 0000000..fbc511c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * Wrapper of retry-related metrics common to both {@link RetriableReadFunction} and
+ * {@link RetriableWriteFunction}.
+ */
+class RetryMetrics {
+  /**
+   * Number of retries executed (excluding the first attempt)
+   */
+  final Counter retryCount;
+
+  /**
+   * Number of successes with only the first attempt
+   */
+  final Counter successCount;
+
+  /**
+   * Number of operations that failed permanently and exhausted all retries
+   */
+  final Counter permFailureCount;
+
+  /**
+   * Total time spent in each IO; this is updated only
+   * when at least one retries have been attempted.
+   */
+  final Timer retryTimer;
+
+  public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) {
+    retryCount = metricsUtil.newCounter(prefix + "-retry-count");
+    successCount = metricsUtil.newCounter(prefix + "-success-count");
+    permFailureCount = metricsUtil.newCounter(prefix + "-perm-failure-count");
+    retryTimer = metricsUtil.newTimer(prefix + "-retry-timer");
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
new file mode 100644
index 0000000..162eb07
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Common retry policy parameters for table IO. This serves as an abstraction on top of
+ * retry libraries. This common policy supports below features:
+ *  - backoff modes: fixed, random, exponential
+ *  - termination modes: by attempts, by duration
+ *  - jitter
+ *
+ * Retry libraries can implement a subset or all features as described by this common policy.
+ */
+public class TableRetryPolicy implements Serializable {
+  enum BackoffType {
+    /**
+     * No backoff in between two retry attempts.
+     */
+    NONE,
+
+    /**
+     * Backoff by a fixed duration {@code sleepTime}.
+     */
+    FIXED,
+
+    /**
+     * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}.
+     */
+    RANDOM,
+
+    /**
+     * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}.
+     */
+    EXPONENTIAL
+  }
+
+  // Backoff parameters
+  private Duration sleepTime;
+  private Duration randomMin;
+  private Duration randomMax;
+  private double exponentialFactor;
+  private Duration exponentialMaxSleep;
+  private Duration jitter;
+
+  // By default no early termination
+  private Integer maxAttempts = null;
+  private Duration maxDuration = null;
+
+  // By default no backoff during retries
+  private BackoffType backoffType = BackoffType.NONE;
+
+  /**
+   * Serializable adapter interface for {@link java.util.function.Predicate}.
+   * This is needed because TableRetryPolicy needs to be serializable as part of the
+   * table config whereas {@link java.util.function.Predicate} is not serializable.
+   */
+  public interface RetryPredicate extends Predicate<Throwable>, Serializable {
+  }
+
+  // By default no custom retry predicate so retry decision is made solely by the table functions
+  private RetryPredicate retryPredicate = (ex) -> false;
+
+  /**
+   * Set the sleepTime time for the fixed backoff policy.
+   * @param sleepTime sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withFixedBackoff(Duration sleepTime) {
+    Preconditions.checkNotNull(sleepTime);
+    this.sleepTime = sleepTime;
+    this.backoffType = BackoffType.FIXED;
+    return this;
+  }
+
+  /**
+   * Set the sleepTime time for the random backoff policy. The actual sleepTime time
+   * before each attempt is randomly selected between {@code [minSleep, maxSleep]}
+   * @param minSleep lower bound sleepTime time
+   * @param maxSleep upper bound sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) {
+    Preconditions.checkNotNull(minSleep);
+    Preconditions.checkNotNull(maxSleep);
+    this.randomMin = minSleep;
+    this.randomMax = maxSleep;
+    this.backoffType = BackoffType.RANDOM;
+    return this;
+  }
+
+  /**
+   * Set the parameters for the exponential backoff policy. The actual sleepTime time
+   * is exponentially incremented up to the {@code maxSleep} and multiplying
+   * successive delays by the {@code factor}.
+   * @param sleepTime initial sleepTime time
+   * @param maxSleep upper bound sleepTime time
+   * @param factor exponential factor for backoff
+   * @return this policy instance
+   */
+  public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) {
+    Preconditions.checkNotNull(sleepTime);
+    Preconditions.checkNotNull(maxSleep);
+    this.sleepTime = sleepTime;
+    this.exponentialMaxSleep = maxSleep;
+    this.exponentialFactor = factor;
+    this.backoffType = BackoffType.EXPONENTIAL;
+    return this;
+  }
+
+  /**
+   * Set the jitter for the backoff policy to provide additional randomness.
+   * If this is set, a random value between {@code [0, jitter]} will be added
+   * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL}
+   * modes only.
+   * @param jitter initial sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withJitter(Duration jitter) {
+    Preconditions.checkNotNull(jitter);
+    if (backoffType != BackoffType.RANDOM) {
+      this.jitter = jitter;
+    }
+    return this;
+  }
+
+  /**
+   * Set maximum number of attempts before terminating the operation.
+   * @param maxAttempts number of attempts
+   * @return this policy instance
+   */
+  public TableRetryPolicy withStopAfterAttempts(int maxAttempts) {
+    Preconditions.checkArgument(maxAttempts >= 0);
+    this.maxAttempts = maxAttempts;
+    return this;
+  }
+
+  /**
+   * Set maximum total delay (sleepTime + execution) before terminating the operation.
+   * @param maxDelay delay time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withStopAfterDelay(Duration maxDelay) {
+    Preconditions.checkNotNull(maxDelay);
+    this.maxDuration = maxDelay;
+    return this;
+  }
+
+  /**
+   * Set the predicate to use for identifying retriable exceptions. If specified, table
+   * retry logic will consult both such predicate and table function and retry will be
+   * attempted if either option returns true.
+   * @param retryPredicate predicate for retriable exception identification
+   * @return this policy instance
+   */
+  public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) {
+    Preconditions.checkNotNull(retryPredicate);
+    this.retryPredicate = retryPredicate;
+    return this;
+  }
+
+  /**
+   * @return initial/fixed sleep time.
+   */
+  public Duration getSleepTime() {
+    return sleepTime;
+  }
+
+  /**
+   * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+   */
+  public Duration getRandomMin() {
+    return randomMin;
+  }
+
+  /**
+   * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}.
+   */
+  public Duration getRandomMax() {
+    return randomMax;
+  }
+
+  /**
+   * @return exponential factor for exponential backoff.
+   */
+  public double getExponentialFactor() {
+    return exponentialFactor;
+  }
+
+  /**
+   * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}.
+   */
+  public Duration getExponentialMaxSleep() {
+    return exponentialMaxSleep;
+  }
+
+  /**
+   * Introduce randomness to the sleepTime time.
+   * @return jitter to add on to each backoff or null if not set.
+   */
+  public Duration getJitter() {
+    return jitter;
+  }
+
+  /**
+   * Termination after a fix number of attempts.
+   * @return maximum number of attempts without success before giving up the operation or null if not set.
+   */
+  public Integer getMaxAttempts() {
+    return maxAttempts;
+  }
+
+  /**
+   * Termination after a fixed duration.
+   * @return maximum duration without success before giving up the operation or null if not set.
+   */
+  public Duration getMaxDuration() {
+    return maxDuration;
+  }
+
+  /**
+   * @return type of the backoff.
+   */
+  public BackoffType getBackoffType() {
+    return backoffType;
+  }
+
+  /**
+   * @return Custom predicate for retriable exception identification or null if not specified.
+   */
+  public RetryPredicate getRetryPredicate() {
+    return retryPredicate;
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index ddcaa5e..fc8780f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -39,6 +39,7 @@
    */
   val CONFIG_REWRITERS = "job.config.rewriters" // streaming.job_config_rewriters
   val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
+  val CONFIG_JOB_PREFIX = "jobs.%s."
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
   val SAMZA_FWK_PATH = "samza.fwk.path"
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 61cf6c5..c089225 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -28,10 +28,12 @@
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -72,7 +74,6 @@
   private GenericInputDescriptor<KV<Object, Object>> input2Descriptor;
   private StreamSpec input3Spec;
   private GenericInputDescriptor<KV<Object, Object>> input3Descriptor;
-  private StreamSpec input4Spec;
   private GenericInputDescriptor<KV<Object, Object>> input4Descriptor;
   private StreamSpec output1Spec;
   private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor;
@@ -168,44 +169,49 @@
   private StreamApplicationDescriptorImpl createStreamGraphWithJoinAndWindow() {
 
     return new StreamApplicationDescriptorImpl(appDesc -> {
-        MessageStream<KV<Object, Object>> messageStream1 =
-            appDesc.getInputStream(input1Descriptor)
-                .map(m -> m);
+        MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor).map(m -> m);
         MessageStream<KV<Object, Object>> messageStream2 =
-            appDesc.getInputStream(input2Descriptor)
-                .partitionBy(m -> m.key, m -> m.value, "p1")
-                .filter(m -> true);
+          appDesc.getInputStream(input2Descriptor).partitionBy(m -> m.key, m -> m.value, "p1").filter(m -> true);
         MessageStream<KV<Object, Object>> messageStream3 =
-            appDesc.getInputStream(input3Descriptor)
-                .filter(m -> true)
-                .partitionBy(m -> m.key, m -> m.value, "p2")
-                .map(m -> m);
+          appDesc.getInputStream(input3Descriptor).filter(m -> true).partitionBy(m -> m.key, m -> m.value, "p2").map(m -> m);
         OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
         OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
 
         messageStream1.map(m -> m)
-            .filter(m->true)
-            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
+          .filter(m -> true)
+          .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
 
         messageStream2.map(m -> m)
-            .filter(m->true)
-            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+          .filter(m -> true)
+          .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+
+        messageStream1.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+          mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1").sendTo(output1);
+        messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+          mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2").sendTo(output2);
+        messageStream3.join(messageStream2, (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+          mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3").sendTo(output2);
+      }, config);
+  }
+
+  private StreamApplicationDescriptorImpl createStreamGraphWithInvalidJoin() {
+    /**
+     *   input1 (64) --
+     *                 |
+     *                join -> output1 (8)
+     *                 |
+     *   input3 (32) --
+     */
+    return new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor);
+        MessageStream<KV<Object, Object>> messageStream3 = appDesc.getInputStream(input3Descriptor);
+        OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
 
         messageStream1
-            .join(messageStream2,
-                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
-            .sendTo(output1);
-        messageStream3
-            .join(messageStream2,
-                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
-            .sendTo(output2);
-        messageStream3
-            .join(messageStream2,
-                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
-            .sendTo(output2);
+          .join(messageStream3,
+              (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+              mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+          .sendTo(output1);
       }, config);
   }
 
@@ -225,7 +231,6 @@
     input1Spec = new StreamSpec("input1", "input1", "system1");
     input2Spec = new StreamSpec("input2", "input2", "system2");
     input3Spec = new StreamSpec("input3", "input3", "system2");
-    input4Spec = new StreamSpec("input4", "input4", "system1");
 
     output1Spec = new StreamSpec("output1", "output1", "system1");
     output2Spec = new StreamSpec("output2", "output2", "system2");
@@ -265,8 +270,8 @@
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
 
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
-    assertTrue(jobGraph.getSources().size() == 3);
-    assertTrue(jobGraph.getSinks().size() == 2);
+    assertTrue(jobGraph.getInputStreams().size() == 3);
+    assertTrue(jobGraph.getOutputStreams().size() == 2);
     assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
   }
 
@@ -276,7 +281,7 @@
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
-    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+    ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
     assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() == 64);
     assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() == 16);
     assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() == 32);
@@ -294,8 +299,8 @@
     StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
-    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, config);
+    ExecutionPlanner.fetchInputAndOutputStreamPartitions(jobGraph, streamManager);
+    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, new StreamConfig(config));
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -303,6 +308,14 @@
       });
   }
 
+  @Test(expected = SamzaException.class)
+  public void testRejectsInvalidJoin() {
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidJoin();
+
+    planner.plan(graphSpec.getOperatorSpecGraph());
+  }
+
   @Test
   public void testDefaultPartitions() {
     Map<String, String> map = new HashMap<>(config);
@@ -321,7 +334,7 @@
   }
 
   @Test
-  public void testTriggerIntervalForJoins() throws Exception {
+  public void testTriggerIntervalForJoins() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
@@ -336,7 +349,7 @@
   }
 
   @Test
-  public void testTriggerIntervalForWindowsAndJoins() throws Exception {
+  public void testTriggerIntervalForWindowsAndJoins() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
@@ -352,7 +365,7 @@
   }
 
   @Test
-  public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
+  public void testTriggerIntervalWithInvalidWindowMs() {
     Map<String, String> map = new HashMap<>(config);
     map.put(TaskConfig.WINDOW_MS(), "-1");
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
@@ -368,9 +381,8 @@
     assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
   }
 
-
   @Test
-  public void testTriggerIntervalForStatelessOperators() throws Exception {
+  public void testTriggerIntervalForStatelessOperators() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
@@ -384,7 +396,7 @@
   }
 
   @Test
-  public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception {
+  public void testTriggerIntervalWhenWindowMsIsConfigured() {
     Map<String, String> map = new HashMap<>(config);
     map.put(TaskConfig.WINDOW_MS(), "2000");
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
@@ -399,7 +411,7 @@
   }
 
   @Test
-  public void testCalculateIntStreamPartitions() throws Exception {
+  public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
     JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph());
@@ -423,10 +435,10 @@
     edge.setPartitionCount(16);
     edges.add(edge);
 
-    assertEquals(32, ExecutionPlanner.maxPartition(edges));
+    assertEquals(32, ExecutionPlanner.maxPartitions(edges));
 
     edges = Collections.emptyList();
-    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartition(edges));
+    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartitions(edges));
   }
 
   @Test
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 73452d8..ed35d67 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -74,9 +74,9 @@
     JobNode n10 = graph1.getOrCreateJobNode("10", "1");
     JobNode n11 = graph1.getOrCreateJobNode("11", "1");
 
-    graph1.addSource(genStream(), n5);
-    graph1.addSource(genStream(), n7);
-    graph1.addSource(genStream(), n3);
+    graph1.addInputStream(genStream(), n5);
+    graph1.addInputStream(genStream(), n7);
+    graph1.addInputStream(genStream(), n3);
     graph1.addIntermediateStream(genStream(), n5, n11);
     graph1.addIntermediateStream(genStream(), n7, n11);
     graph1.addIntermediateStream(genStream(), n7, n8);
@@ -85,9 +85,9 @@
     graph1.addIntermediateStream(genStream(), n11, n9);
     graph1.addIntermediateStream(genStream(), n8, n9);
     graph1.addIntermediateStream(genStream(), n11, n10);
-    graph1.addSink(genStream(), n2);
-    graph1.addSink(genStream(), n9);
-    graph1.addSink(genStream(), n10);
+    graph1.addOutputStream(genStream(), n2);
+    graph1.addOutputStream(genStream(), n9);
+    graph1.addOutputStream(genStream(), n10);
   }
 
   /**
@@ -108,7 +108,7 @@
     JobNode n6 = graph2.getOrCreateJobNode("6", "1");
     JobNode n7 = graph2.getOrCreateJobNode("7", "1");
 
-    graph2.addSource(genStream(), n1);
+    graph2.addInputStream(genStream(), n1);
     graph2.addIntermediateStream(genStream(), n1, n2);
     graph2.addIntermediateStream(genStream(), n2, n3);
     graph2.addIntermediateStream(genStream(), n3, n4);
@@ -117,7 +117,7 @@
     graph2.addIntermediateStream(genStream(), n6, n2);
     graph2.addIntermediateStream(genStream(), n5, n5);
     graph2.addIntermediateStream(genStream(), n5, n7);
-    graph2.addSink(genStream(), n7);
+    graph2.addOutputStream(genStream(), n7);
   }
 
   /**
@@ -132,7 +132,7 @@
     JobNode n1 = graph3.getOrCreateJobNode("1", "1");
     JobNode n2 = graph3.getOrCreateJobNode("2", "1");
 
-    graph3.addSource(genStream(), n1);
+    graph3.addInputStream(genStream(), n1);
     graph3.addIntermediateStream(genStream(), n1, n1);
     graph3.addIntermediateStream(genStream(), n1, n2);
     graph3.addIntermediateStream(genStream(), n2, n2);
@@ -149,7 +149,7 @@
 
     JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
-    graph4.addSource(genStream(), n1);
+    graph4.addInputStream(genStream(), n1);
     graph4.addIntermediateStream(genStream(), n1, n1);
   }
 
@@ -180,12 +180,12 @@
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
-    graph.addSource(s1, n1);
-    graph.addSource(s2, n1);
-    graph.addSource(s3, n2);
-    graph.addSource(s3, n3);
+    graph.addInputStream(s1, n1);
+    graph.addInputStream(s2, n1);
+    graph.addInputStream(s3, n2);
+    graph.addInputStream(s3, n3);
 
-    assertTrue(graph.getSources().size() == 3);
+    assertTrue(graph.getInputStreams().size() == 3);
 
     assertTrue(graph.getOrCreateJobNode("1", "1").getInEdges().size() == 2);
     assertTrue(graph.getOrCreateJobNode("2", "1").getInEdges().size() == 1);
@@ -214,11 +214,11 @@
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
-    graph.addSink(s1, n1);
-    graph.addSink(s2, n2);
-    graph.addSink(s3, n2);
+    graph.addOutputStream(s1, n1);
+    graph.addOutputStream(s2, n2);
+    graph.addOutputStream(s3, n2);
 
-    assertTrue(graph.getSinks().size() == 3);
+    assertTrue(graph.getOutputStreams().size() == 3);
     assertTrue(graph.getOrCreateJobNode("1", "1").getOutEdges().size() == 1);
     assertTrue(graph.getOrCreateJobNode("2", "1").getOutEdges().size() == 2);
 
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 24178d0..42f05c0 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -124,11 +124,14 @@
     TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
     tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class));
 
-    Table table = tableManager.getTable(TABLE_ID);
-    verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
-    Assert.assertEquals(DummyTableProviderFactory.table, table);
+    for (int i = 0; i < 2; i++) {
+      Table table = tableManager.getTable(TABLE_ID);
+      verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
+      verify(DummyTableProviderFactory.tableProvider, times(1)).getTable();
+      Assert.assertEquals(DummyTableProviderFactory.table, table);
+    }
 
-    Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables");
+    Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tableContexts");
     TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
 
     TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 21fc6a5..3e844c3 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.metrics.Counter;
@@ -34,6 +35,9 @@
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -43,6 +47,7 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -50,6 +55,18 @@
 
 
 public class TestRemoteTable {
+  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+  public static TaskContext getMockTaskContext() {
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+    doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+    doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+    TaskContext taskContext = mock(TaskContext.class);
+    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+    return taskContext;
+  }
+
   private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
       TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
     return getTable(tableId, readFn, writeFn, null);
@@ -72,12 +89,7 @@
       table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
     }
 
-    TaskContext taskContext = mock(TaskContext.class);
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
-    doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
-    doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
-    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+    TaskContext taskContext = getMockTaskContext();
 
     SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
 
@@ -86,35 +98,53 @@
     return (T) table;
   }
 
-  private void doTestGet(boolean sync, boolean error) throws Exception {
+  private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+    String tableId = "testGet-" + sync + error + retry;
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     // Sync is backed by async so needs to mock the async method
     CompletableFuture<String> future;
     if (error) {
       future = new CompletableFuture();
       future.completeExceptionally(new RuntimeException("Test exception"));
+      if (!retry) {
+        doReturn(future).when(readFn).getAsync(anyString());
+      } else {
+        final int [] times = new int[] {0};
+        doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+            .when(readFn).getAsync(anyString());
+      }
     } else {
       future = CompletableFuture.completedFuture("bar");
+      doReturn(future).when(readFn).getAsync(anyString());
     }
-    doReturn(future).when(readFn).getAsync(anyString());
-    RemoteReadableTable<String, String> table = getTable("testGet-" + sync + error, readFn, null);
+    if (retry) {
+      doReturn(true).when(readFn).isRetriable(any());
+      TableRetryPolicy policy = new TableRetryPolicy();
+      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+    }
+    RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
     Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
     verify(table.readRateLimiter, times(1)).throttle(anyString());
   }
 
   @Test
   public void testGet() throws Exception {
-    doTestGet(true, false);
+    doTestGet(true, false, false);
   }
 
   @Test
   public void testGetAsync() throws Exception {
-    doTestGet(false, false);
+    doTestGet(false, false, false);
   }
 
   @Test(expected = ExecutionException.class)
   public void testGetAsyncError() throws Exception {
-    doTestGet(false, true);
+    doTestGet(false, true, false);
+  }
+
+  @Test
+  public void testGetAsyncErrorRetried() throws Exception {
+    doTestGet(false, true, true);
   }
 
   @Test
@@ -139,23 +169,36 @@
           });
   }
 
-  private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exception {
-    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testPut-" + sync + error + isDelete,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
+  private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+    String tableId = "testPut-" + sync + error + isDelete + retry;
+    TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+    TableWriteFunction<String, String> writeFn = mockWriteFn;
+    CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+    CompletableFuture<Void> failureFuture = new CompletableFuture();
+    failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+    if (!error) {
+      if (isDelete) {
+        doReturn(successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(successFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else if (!retry) {
+      if (isDelete) {
+        doReturn(failureFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+      }
     } else {
-      future = CompletableFuture.completedFuture(null);
+      doReturn(true).when(writeFn).isRetriable(any());
+      final int [] times = new int[] {0};
+      if (isDelete) {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+      }
+      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
     }
-    // Sync is backed by async so needs to mock the async method
-    if (isDelete) {
-      doReturn(future).when(writeFn).deleteAsync(any());
-    } else {
-      doReturn(future).when(writeFn).putAsync(any(), any());
-    }
+    RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
     if (sync) {
       table.put("foo", isDelete ? null : "bar");
     } else {
@@ -164,9 +207,9 @@
     ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
     ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
     if (isDelete) {
-      verify(writeFn, times(1)).deleteAsync(keyCaptor.capture());
+      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
     } else {
-      verify(writeFn, times(1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
       Assert.assertEquals("bar", valCaptor.getValue());
     }
     Assert.assertEquals("foo", keyCaptor.getValue());
@@ -179,27 +222,32 @@
 
   @Test
   public void testPut() throws Exception {
-    doTestPut(true, false, false);
+    doTestPut(true, false, false, false);
   }
 
   @Test
   public void testPutDelete() throws Exception {
-    doTestPut(true, false, true);
+    doTestPut(true, false, true, false);
   }
 
   @Test
   public void testPutAsync() throws Exception {
-    doTestPut(false, false, false);
+    doTestPut(false, false, false, false);
   }
 
   @Test
   public void testPutAsyncDelete() throws Exception {
-    doTestPut(false, false, true);
+    doTestPut(false, false, true, false);
   }
 
   @Test(expected = ExecutionException.class)
   public void testPutAsyncError() throws Exception {
-    doTestPut(false, true, false);
+    doTestPut(false, true, false, false);
+  }
+
+  @Test
+  public void testPutAsyncErrorRetried() throws Exception {
+    doTestPut(false, true, false, true);
   }
 
   private void doTestDelete(boolean sync, boolean error) throws Exception {
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
index e30da12..efe1acf 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -31,6 +31,9 @@
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
@@ -138,7 +141,9 @@
   private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) {
     int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0);
     RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1");
-    desc.withReadFunction(mock(TableReadFunction.class));
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withRetryPredicate((ex) -> false);
+    desc.withReadFunction(mock(TableReadFunction.class), retryPolicy);
     desc.withWriteFunction(mock(TableWriteFunction.class));
     desc.withAsyncCallbackExecutorPoolSize(10);
 
@@ -178,6 +183,9 @@
 
     ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.callbackExecutor;
     Assert.assertEquals(10, callbackExecutor.getCorePoolSize());
+
+    Assert.assertNotNull(rwTable.readFn instanceof RetriableReadFunction);
+    Assert.assertNotNull(!(rwTable.writeFn instanceof RetriableWriteFunction));
   }
 
   @Test
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
new file mode 100644
index 0000000..9dd5a74
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.remote.TestRemoteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRetriableTableFunctions {
+  private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+  public TableMetricsUtil getMetricsUtil(String tableId) {
+    Table table = mock(Table.class);
+    SamzaContainerContext cntCtx = mock(SamzaContainerContext.class);
+    TaskContext taskCtx = TestRemoteTable.getMockTaskContext();
+    return new TableMetricsUtil(cntCtx, taskCtx, table, tableId);
+  }
+
+  @Test
+  public void testFirstTimeSuccessGet() throws Exception {
+    String tableId = "testFirstTimeSuccessGet";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(100));
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+    doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+    RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    Assert.assertEquals("bar", retryIO.getAsync("foo").get());
+    verify(readFn, times(1)).getAsync(anyString());
+
+    Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax());
+  }
+
+  @Test
+  public void testRetryEngagedGet() throws Exception {
+    String tableId = "testRetryEngagedGet";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(10));
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+
+    int [] times = new int[] {0};
+    Map<String, String> map = new HashMap<>();
+    map.put("foo1", "bar1");
+    map.put("foo2", "bar2");
+    doAnswer(invocation -> {
+        CompletableFuture<Map<String, String>> future = new CompletableFuture();
+        if (times[0] > 0) {
+          future.complete(map);
+        } else {
+          times[0]++;
+          future.completeExceptionally(new RuntimeException("test exception"));
+        }
+        return future;
+      }).when(readFn).getAllAsync(any());
+
+    RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    Assert.assertEquals(map, retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+    verify(readFn, times(2)).getAllAsync(any());
+
+    Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+  @Test
+  public void testRetryExhaustedTimeGet() throws Exception {
+    String tableId = "testRetryExhaustedTime";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterDelay(Duration.ofMillis(100));
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(readFn).getAsync(anyString());
+
+    RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.getAsync("foo").get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay
+    verify(readFn, atLeast(3)).getAsync(anyString());
+    Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3);
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+  @Test
+  public void testRetryExhaustedAttemptsGet() throws Exception {
+    String tableId = "testRetryExhaustedAttempts";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterAttempts(10);
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(readFn).getAllAsync(any());
+
+    RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // 1 initial try + 10 retries
+    verify(readFn, times(11)).getAllAsync(any());
+    Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+  @Test
+  public void testFirstTimeSuccessPut() throws Exception {
+    String tableId = "testFirstTimeSuccessPut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(100));
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    doReturn(true).when(writeFn).isRetriable(any());
+    doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(), anyString());
+    RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    retryIO.putAsync("foo", "bar").get();
+    verify(writeFn, times(1)).putAsync(anyString(), anyString());
+
+    Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax());
+  }
+
+  @Test
+  public void testRetryEngagedPut() throws Exception {
+    String tableId = "testRetryEngagedPut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(10));
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any());
+    doReturn(true).when(writeFn).isRetriable(any());
+
+    int [] times = new int[] {0};
+    List<Entry<String, String>> records = new ArrayList<>();
+    records.add(new Entry<>("foo1", "bar1"));
+    records.add(new Entry<>("foo2", "bar2"));
+    doAnswer(invocation -> {
+        CompletableFuture<Map<String, String>> future = new CompletableFuture();
+        if (times[0] > 0) {
+          future.complete(null);
+        } else {
+          times[0]++;
+          future.completeExceptionally(new RuntimeException("test exception"));
+        }
+        return future;
+      }).when(writeFn).putAllAsync(any());
+
+    RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    retryIO.putAllAsync(records).get();
+    verify(writeFn, times(2)).putAllAsync(any());
+
+    Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+  @Test
+  public void testRetryExhaustedTimePut() throws Exception {
+    String tableId = "testRetryExhaustedTimePut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterDelay(Duration.ofMillis(100));
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    doReturn(true).when(writeFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(writeFn).deleteAsync(anyString());
+
+    RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.deleteAsync("foo").get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay
+    verify(writeFn, atLeast(3)).deleteAsync(anyString());
+    Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3);
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+  @Test
+  public void testRetryExhaustedAttemptsPut() throws Exception {
+    String tableId = "testRetryExhaustedAttemptsPut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterAttempts(10);
+    TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+    doReturn(true).when(writeFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(writeFn).deleteAllAsync(any());
+
+    RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // 1 initial try + 10 retries
+    verify(writeFn, times(11)).deleteAllAsync(any());
+    Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+  @Test
+  public void testMixedIsRetriablePredicates() throws Exception {
+    String tableId = "testMixedIsRetriablePredicates";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(100));
+
+    // Retry should be attempted based on the custom classification, ie. retry on NPE
+    policy.withRetryPredicate(ex -> ex instanceof NullPointerException);
+
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+
+    // Table fn classification only retries on IllegalArgumentException
+    doAnswer(arg -> arg.getArgumentAt(0, Throwable.class) instanceof IllegalArgumentException).when(readFn).isRetriable(any());
+
+    int [] times = new int[1];
+    doAnswer(arg -> {
+        if (times[0]++ == 0) {
+          CompletableFuture<String> future = new CompletableFuture();
+          future.completeExceptionally(new NullPointerException("test exception"));
+          return future;
+        } else {
+          return CompletableFuture.completedFuture("bar");
+        }
+      }).when(readFn).getAsync(any());
+
+    RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+
+    Assert.assertEquals("bar", retryIO.getAsync("foo").get());
+
+    verify(readFn, times(2)).getAsync(anyString());
+    Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0);
+  }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
new file mode 100644
index 0000000..c343d63
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import net.jodah.failsafe.RetryPolicy;
+
+
+public class TestTableRetryPolicy {
+  @Test
+  public void testNoRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    Assert.assertEquals(TableRetryPolicy.BackoffType.NONE, retryPolicy.getBackoffType());
+  }
+
+  @Test
+  public void testFixedRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withFixedBackoff(Duration.ofMillis(1000));
+    retryPolicy.withJitter(Duration.ofMillis(100));
+    retryPolicy.withStopAfterAttempts(4);
+    Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, retryPolicy.getBackoffType());
+    RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+    Assert.assertEquals(1000, fsRetry.getDelay().toMillis());
+    Assert.assertEquals(100, fsRetry.getJitter().toMillis());
+    Assert.assertEquals(4, fsRetry.getMaxRetries());
+    Assert.assertNotNull(retryPolicy.getRetryPredicate());
+  }
+
+  @Test
+  public void testRandomRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withRandomBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000));
+    retryPolicy.withJitter(Duration.ofMillis(100)); // no-op
+    Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, retryPolicy.getBackoffType());
+    RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+    Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis());
+    Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis());
+  }
+
+  @Test
+  public void testExponentialRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000), 1.5);
+    retryPolicy.withJitter(Duration.ofMillis(100));
+    Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, retryPolicy.getBackoffType());
+    RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+    Assert.assertEquals(1000, fsRetry.getDelay().toMillis());
+    Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis());
+    Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001);
+    Assert.assertEquals(100, fsRetry.getJitter().toMillis());
+  }
+
+  @Test
+  public void testCustomRetryPredicate() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withRetryPredicate((e) -> e instanceof IllegalArgumentException);
+    Assert.assertTrue(retryPolicy.getRetryPredicate().test(new IllegalArgumentException()));
+    Assert.assertFalse(retryPolicy.getRetryPredicate().test(new NullPointerException()));
+  }
+}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 28a1bac..0d50f26 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -37,6 +37,7 @@
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
@@ -221,6 +222,17 @@
     if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
       return null;
     }
+    /*
+     * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM,
+     * then they are equal. Otherwise END_OF_STREAM is always greater than any
+     * other offsets.
+     */
+    if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+      return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1;
+    }
+    if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+      return -1;
+    }
     int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
     int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
     if (fileIndex1 == fileIndex2) {
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 5aaad26..7071b39 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -45,6 +45,7 @@
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.table.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,7 +114,8 @@
             tableFieldNames);
 
     Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
-    Serde<SamzaSqlRelMessage> valueSerde = new JsonSerdeV2<>(SamzaSqlRelMessage.class);
+    SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+        (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
 
     // Always re-partition the messages from the input stream by the composite key and then join the messages
     // with the table.
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 477d5b8..033bcdf 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -191,7 +191,8 @@
   public TestRunner addOverrideConfig(String key, String value) {
     Preconditions.checkNotNull(key);
     Preconditions.checkNotNull(value);
-    configs.put(key, value);
+    String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME);
+    configs.put(String.format("%s%s", configKeyPrefix, key), value);
     return this;
   }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 3b2d08a..b249d4d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -87,9 +87,9 @@
   private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
   private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
   private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
-  private static final String TASK_SHUTDOWN_MS = "2000";
-  private static final String JOB_DEBOUNCE_TIME_MS = "2000";
-  private static final String BARRIER_TIMEOUT_MS = "2000";
+  private static final String TASK_SHUTDOWN_MS = "10000";
+  private static final String JOB_DEBOUNCE_TIME_MS = "10000";
+  private static final String BARRIER_TIMEOUT_MS = "10000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
 
   private String inputKafkaTopic;
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 0d9df8b..5c067ad 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -32,6 +32,7 @@
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.serializers.IntegerSerde;
@@ -66,8 +67,7 @@
         Arrays.asList(TestTableData.generateProfiles(10)));
   }
 
-  // @Test
-  // TODO: re-enable after fixing the coordinator stream issue in SAMZA-1786
+  @Test
   public void testJoinWithDurableSideInputTable() {
     runTest(
         "durable-side-input",
@@ -98,6 +98,7 @@
         .addInputStream(profileStream)
         .addOutputStream(outputStream)
         .addConfigs(new MapConfig(configs))
+        .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
         .run(Duration.ofMillis(100000));
 
     try {
@@ -116,7 +117,6 @@
       assertEquals("Mismatch between the expected and actual join count", results.size(),
           expectedEnrichedPageviews.size());
       assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
-
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index d79683e..4cf99ff 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -92,6 +92,11 @@
       return CompletableFuture.completedFuture(profileMap.get(key));
     }
 
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+
     static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
       return new InMemoryReadFunction(serializedProfiles);
     }
@@ -124,6 +129,11 @@
       records.remove(key);
       return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
   }
 
   private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
@@ -142,6 +152,18 @@
     return appDesc.getTable(cachingDesc);
   }
 
+  static class MyReadFunction implements TableReadFunction {
+    @Override
+    public CompletableFuture getAsync(Object key) {
+      return null;
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
   private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
     final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
 
@@ -166,9 +188,12 @@
           .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
           .withRateLimiter(readRateLimiter, null, null);
 
+      // dummy reader
+      TableReadFunction readFn = new MyReadFunction();
+
       RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
       outputTableDesc
-          .withReadFunction(key -> null) // dummy reader
+          .withReadFunction(readFn)
           .withWriteFunction(writer)
           .withRateLimiter(writeRateLimiter, null, null);
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index 41b6509..34ffbd4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -24,6 +24,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.ConfigRewriter;
@@ -116,15 +118,27 @@
   public static class MySampleNonTableDescriptorsProvider {
   }
 
+  static class MyReadFunction implements TableReadFunction {
+    @Override
+    public CompletableFuture getAsync(Object key) {
+      return null;
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
   public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
     @Override
     public List<TableDescriptor> getTableDescriptors(Config config) {
       List<TableDescriptor> tableDescriptors = new ArrayList<>();
       final RateLimiter readRateLimiter = mock(RateLimiter.class);
-      final TableReadFunction readRemoteTable = (TableReadFunction) key -> null;
+      final MyReadFunction readFn = new MyReadFunction();
 
       tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
-          .withReadFunction(readRemoteTable)
+          .withReadFunction(readFn)
           .withRateLimiter(readRateLimiter, null, null)
           .withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
       tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")