Changed StreamOperatorTask to throw appropriate Exception when InputOperator is not found for a SystemStream in the OperatorGraph (#1483)

Symptom:
Samza job fails with a TaskCallbackTimeoutException if InputOperator is not found in OperatorGraph for a SystemStream

Cause:
Currently, the pipeline silently fails when there is no InputOperator for a SystemStream in the OperatorGraph as the code doesn't handle such cases. The pipeline just silently fails and the job would throw an exception due to sync task callback timeout (TaskCallbackTimeoutException).
One of the scenarios that causes this is due to non-determinism in the plan seen by the containers and the AM. It can happen in multiple scenarios where the transform/operators have non-determinism in generating their names. e.g. we noticed beam transform appended hashcode.

Changes:
Call failure method of the TaskCallback with a SamzaException with the appropriate exception message.
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 5a474cd..0079fab 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.task;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -146,6 +147,14 @@
               callback.complete();
             }
           });
+        } else {
+          // If InputOperator is not found in the operator graph for a given SystemStream, throw an exception else the
+          // job will timeout due to async task callback timeout (TaskCallbackTimeoutException)
+          final String errMessage = String.format("InputOperator not found in OperatorGraph for %s. The available input"
+              + " operators are: %s. Please check SystemStream configuration for the `SystemConsumer` and/or task.inputs"
+              + " task configuration.", systemStream, operatorImplGraph.getAllInputOperators());
+          LOG.error(errMessage);
+          callback.failure(new SamzaException(errMessage));
         }
       } catch (Exception e) {
         LOG.error("Failed to process the incoming message due to ", e);
@@ -184,6 +193,14 @@
     this.taskThreadPool = taskThreadPool;
   }
 
+  /**
+   * Package private setter for private var operatorImplGraph to be used in TestStreamOperatorTask tests.
+   * */
+  @VisibleForTesting
+  void setOperatorImplGraph(OperatorImplGraph operatorImplGraph) {
+    this.operatorImplGraph = operatorImplGraph;
+  }
+
   /* package private for testing */
   OperatorImplGraph getOperatorImplGraph() {
     return this.operatorImplGraph;
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index 2d43c63..2ad9f7a 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -22,17 +22,27 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.impl.OperatorImplGraph;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.Clock;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 
 public class TestStreamOperatorTask {
   public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask task) {
@@ -80,4 +90,30 @@
         mockTaskCoordinator, mockTaskCallback);
     failureLatch.await();
   }
+
+  /**
+   * Tests if the appropriate SamzaException is propagated to the TaskCallback if there is no InputOperator for a given
+   * SystemStream in the OperatorGraph.
+   * */
+  @Test
+  public void testExceptionIfInputOperatorMissing() throws NoSuchFieldException, IllegalAccessException {
+    IncomingMessageEnvelope mockIme = mock(IncomingMessageEnvelope.class, RETURNS_DEEP_STUBS);
+    SystemStream testSystemStream = new SystemStream("foo", "bar");
+    when(mockIme.getSystemStreamPartition().getSystemStream()).thenReturn(testSystemStream);
+
+    OperatorImplGraph mockOperatorImplGraph = mock(OperatorImplGraph.class);
+    when(mockOperatorImplGraph.getInputOperator(anyObject())).thenReturn(null);
+    StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class));
+    operatorTask.setOperatorImplGraph(mockOperatorImplGraph);
+    TaskCallback mockTaskCallback = mock(TaskCallback.class);
+    operatorTask.processAsync(mockIme, mock(MessageCollector.class), mock(TaskCoordinator.class), mockTaskCallback);
+
+    ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
+    verify(mockTaskCallback, only()).failure(throwableCaptor.capture());
+    assertEquals(throwableCaptor.getValue().getClass(), SamzaException.class);
+    String expectedErrMessage = String.format("InputOperator not found in OperatorGraph for %s. The available input"
+        + " operators are: %s. Please check SystemStream configuration for the `SystemConsumer` and/or task.inputs"
+        + " task configuration.", testSystemStream, mockOperatorImplGraph.getAllInputOperators());
+    assertEquals(throwableCaptor.getValue().getMessage(), expectedErrMessage);
+  }
 }