APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 81dc96e..a5fc3a5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -185,7 +185,11 @@
if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
// input attributes of the downstream operator
for (InputPortMeta sink : streamMeta.getSinks()) {
- portInfo.contextAttributes = sink.getAttributes();
+ try {
+ portInfo.contextAttributes = sink.getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
break;
}
}
@@ -199,7 +203,7 @@
// Create mappings for all non-inline operators
if (input.target.getContainer() != out.source.getContainer()) {
InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input);
- StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta);
+ StreamCodec<?> streamCodecInfo = inputPortMeta.getStreamCodec();
Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
if (!portInfo.streamCodecs.containsKey(id)) {
portInfo.streamCodecs.put(id, streamCodecInfo);
@@ -231,11 +235,19 @@
InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta);
if (inputPortMeta != null) {
- inputInfo.contextAttributes = inputPortMeta.getAttributes();
+ try {
+ inputInfo.contextAttributes = inputPortMeta.getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
}
if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
- inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
+ try {
+ inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
}
inputInfo.sourceNodeId = sourceOutput.source.getId();
@@ -272,7 +284,7 @@
// On the input side there is a unlikely scenario of partitions even for inline stream that is being
// handled. Always specifying a stream codec configuration in case that scenario happens.
InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in);
- StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta);
+ StreamCodec<?> streamCodecInfo = idInputPortMeta.getStreamCodec();
Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
inputInfo.streamCodecs.put(id, streamCodecInfo);
ndi.inputs.add(inputInfo);
@@ -327,23 +339,6 @@
return operator;
}
- public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta)
- {
- if (inputPortMeta != null) {
- StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
- if (codec == null) {
- // it cannot be this object that gets returned. Depending on this value is dangerous
- codec = inputPortMeta.getPortObject().getStreamCodec();
- if (codec != null) {
- // don't create codec multiple times - it will assign a new identifier
- inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
- }
- }
- return codec;
- }
- return null;
- }
-
/**
* Create deploy info for operator.
* <p>
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index aa79243..2fe8107 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2146,7 +2146,7 @@
}
for (InputPortMeta ipm : out.logicalStream.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
// following needs to match the concat logic in StreamingContainer
String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
@@ -2243,7 +2243,7 @@
for (PTOperator.PTOutput out : operator.getOutputs()) {
if (!out.isDownStreamInline()) {
for (InputPortMeta ipm : out.logicalStream.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
// following needs to match the concat logic in StreamingContainer
String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 3825505..01d9a1c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -61,6 +61,8 @@
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.Slider;
+import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+
/**
* DAG contains the logical declarations of operators and streams.
* <p>
@@ -270,6 +272,20 @@
throw new UnsupportedOperationException("Not supported yet.");
}
+ public StreamCodec<?> getStreamCodec()
+ {
+ return attributes.get(STREAM_CODEC);
+ }
+
+ void setStreamCodec(StreamCodec<?> streamCodec)
+ {
+ if (streamCodec != null) {
+ StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
+ if (oldStreamCodec != null && oldStreamCodec != streamCodec) {
+ LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec);
+ }
+ }
+ }
}
public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
@@ -681,14 +697,14 @@
private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
{
- StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
+ StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
if (inputStreamCodec != null) {
Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
- StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
+ StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
- setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+ setInputPortAttribute(port, STREAM_CODEC, codec);
}
}
@@ -980,6 +996,12 @@
metaPort.adqAnnotation = adqAnnotation;
inPortMap.put(portObject, metaPort);
markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
+ if (metaPort.getStreamCodec() == null) {
+ metaPort.setStreamCodec(portObject.getStreamCodec());
+ } else if (portObject.getStreamCodec() != null) {
+ LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(),
+ portObject.getStreamCodec(), metaPort.getStreamCodec());
+ }
}
@Override
@@ -1612,7 +1634,7 @@
}
for (StreamMeta n: this.streams.values()) {
for (InputPortMeta sink : n.getSinks()) {
- StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC);
+ StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC);
if (streamCodec != null) {
classNames.add(streamCodec.getClass().getName());
} else {
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index f30ceb6..7efce6c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -38,7 +38,6 @@
import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.util.Pair;
-import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -230,7 +229,7 @@
boolean separateUnifiers = false;
Integer lastId = null;
for (InputPortMeta ipm : streamMeta.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
Integer id = plan.getStreamCodecIdentifier(streamCodecInfo);
if (lastId == null) {
lastId = id;
@@ -249,7 +248,7 @@
unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
} else {
for (InputPortMeta ipm : streamMeta.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) {
unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
@@ -303,7 +302,7 @@
unifier.inputs.clear();
List<PTOutput> doperUnifierSources = unifierSources;
if (separateUnifiers) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second);
+ StreamCodec<?> streamCodecInfo = doperEntry.second.getStreamCodec();
List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo);
if (cascadeSources != null) {
doperUnifierSources = cascadeSources;
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index ddf3448..da5456e 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1152,7 +1152,7 @@
Map<Integer, StreamCodec<?>> streamCodecs,
String id, PhysicalPlan plan )
{
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort));
+ StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec();
Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);