APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 81dc96e..a5fc3a5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -185,7 +185,11 @@
         if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
           // input attributes of the downstream operator
           for (InputPortMeta sink : streamMeta.getSinks()) {
-            portInfo.contextAttributes = sink.getAttributes();
+            try {
+              portInfo.contextAttributes = sink.getAttributes().clone();
+            } catch (CloneNotSupportedException e) {
+              throw new RuntimeException("Cannot clone attributes", e);
+            }
             break;
           }
         }
@@ -199,7 +203,7 @@
             // Create mappings for all non-inline operators
             if (input.target.getContainer() != out.source.getContainer()) {
               InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input);
-              StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta);
+              StreamCodec<?> streamCodecInfo = inputPortMeta.getStreamCodec();
               Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
               if (!portInfo.streamCodecs.containsKey(id)) {
                 portInfo.streamCodecs.put(id, streamCodecInfo);
@@ -231,11 +235,19 @@
         InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta);
 
         if (inputPortMeta != null) {
-          inputInfo.contextAttributes = inputPortMeta.getAttributes();
+          try {
+            inputInfo.contextAttributes = inputPortMeta.getAttributes().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Cannot clone attributes", e);
+          }
         }
 
         if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
-          inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
+          try {
+            inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Cannot clone attributes", e);
+          }
         }
 
         inputInfo.sourceNodeId = sourceOutput.source.getId();
@@ -272,7 +284,7 @@
         // On the input side there is a unlikely scenario of partitions even for inline stream that is being
         // handled. Always specifying a stream codec configuration in case that scenario happens.
         InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in);
-        StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta);
+        StreamCodec<?> streamCodecInfo = idInputPortMeta.getStreamCodec();
         Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
         inputInfo.streamCodecs.put(id, streamCodecInfo);
         ndi.inputs.add(inputInfo);
@@ -327,23 +339,6 @@
     return operator;
   }
 
-  public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta)
-  {
-    if (inputPortMeta != null) {
-      StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
-      if (codec == null) {
-        // it cannot be this object that gets returned. Depending on this value is dangerous 
-        codec = inputPortMeta.getPortObject().getStreamCodec();
-        if (codec != null) {
-          // don't create codec multiple times - it will assign a new identifier
-          inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
-        }
-      }
-      return codec;
-    }
-    return null;
-  }
-
   /**
    * Create deploy info for operator.
    * <p>
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index aa79243..2fe8107 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2146,7 +2146,7 @@
           }
 
           for (InputPortMeta ipm : out.logicalStream.getSinks()) {
-            StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+            StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
             Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
             // following needs to match the concat logic in StreamingContainer
             String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
@@ -2243,7 +2243,7 @@
             for (PTOperator.PTOutput out : operator.getOutputs()) {
               if (!out.isDownStreamInline()) {
                 for (InputPortMeta ipm : out.logicalStream.getSinks()) {
-                  StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+                  StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
                   Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
                   // following needs to match the concat logic in StreamingContainer
                   String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 3825505..01d9a1c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -61,6 +61,8 @@
 import com.datatorrent.stram.engine.DefaultUnifier;
 import com.datatorrent.stram.engine.Slider;
 
+import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+
 /**
  * DAG contains the logical declarations of operators and streams.
  * <p>
@@ -270,6 +272,20 @@
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
+    public StreamCodec<?> getStreamCodec()
+    {
+      return attributes.get(STREAM_CODEC);
+    }
+
+    void setStreamCodec(StreamCodec<?> streamCodec)
+    {
+      if (streamCodec != null) {
+        StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
+        if (oldStreamCodec != null && oldStreamCodec != streamCodec) {
+          LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec);
+        }
+      }
+    }
   }
 
   public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
@@ -681,14 +697,14 @@
 
     private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
     {
-      StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
+      StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
       if (inputStreamCodec != null) {
         Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
         InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
-        StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
+        StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
         StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
-        setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+        setInputPortAttribute(port, STREAM_CODEC, codec);
       }
     }
 
@@ -980,6 +996,12 @@
         metaPort.adqAnnotation = adqAnnotation;
         inPortMap.put(portObject, metaPort);
         markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
+        if (metaPort.getStreamCodec() == null) {
+          metaPort.setStreamCodec(portObject.getStreamCodec());
+        } else if (portObject.getStreamCodec() != null) {
+          LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(),
+              portObject.getStreamCodec(), metaPort.getStreamCodec());
+        }
       }
 
       @Override
@@ -1612,7 +1634,7 @@
     }
     for (StreamMeta n: this.streams.values()) {
       for (InputPortMeta sink : n.getSinks()) {
-        StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC);
+        StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC);
         if (streamCodec != null) {
           classNames.add(streamCodec.getClass().getName());
         } else {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index f30ceb6..7efce6c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -38,7 +38,6 @@
 import com.datatorrent.api.StreamCodec;
 
 import com.datatorrent.common.util.Pair;
-import com.datatorrent.stram.StreamingContainerAgent;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -230,7 +229,7 @@
       boolean separateUnifiers = false;
       Integer lastId = null;
       for (InputPortMeta ipm : streamMeta.getSinks()) {
-        StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+        StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
         Integer id = plan.getStreamCodecIdentifier(streamCodecInfo);
         if (lastId == null) {
           lastId = id;
@@ -249,7 +248,7 @@
           unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
         } else {
           for (InputPortMeta ipm : streamMeta.getSinks()) {
-            StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+            StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
             if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) {
               unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
               cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
@@ -303,7 +302,7 @@
             unifier.inputs.clear();
             List<PTOutput> doperUnifierSources = unifierSources;
             if (separateUnifiers) {
-              StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second);
+              StreamCodec<?> streamCodecInfo = doperEntry.second.getStreamCodec();
               List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo);
               if (cascadeSources != null) {
                 doperUnifierSources = cascadeSources;
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index ddf3448..da5456e 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1152,7 +1152,7 @@
                                        Map<Integer, StreamCodec<?>> streamCodecs,
                                        String id, PhysicalPlan plan )
   {
-    StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort));
+    StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec();
     Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
     Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
     checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);