Custom control tuple support in Apache Apex gives the user the capability to insert user defined control tuples in the data flow. For analogy, the engine already supports a few pre-defined control tuples like BEGIN_WINDOW, END_WINDOW, etc. Until now, we did not have the support for applications to insert their own control tuples.
All discussion in this document is related to Control Tuples generated by user defined logic. The document may refer to these tuples as Control Tuples, User Defined Control Tuples or Custom Control Tuples interchangeably.
A user defined control tuple could be any user defined object which implements a ControlTuple interface.
See Delivery Semantics for details on DeliveryType
public interface ControlTuple { DeliveryType getDeliveryType(); enum DeliveryType { IMMEDIATE, END_WINDOW } }
Example user defined control tuple:
public class TestControlTuple implements ControlTuple { public long data; public boolean immediate; // For Kryo public TestControlTuple() { data = 0; } // Constructor public TestControlTuple(long data, boolean immediate) { this.data = data; this.immediate = immediate; } @Override public DeliveryType getDeliveryType() { if (immediate) { return DeliveryType.IMMEDIATE; } else { return DeliveryType.END_WINDOW; } } }
A control tuple may be used in an application to trigger some sort of action in a downstream operator. For example, the source operator might want to notify the last operator that it has emitted all the data in a file and that the file has now ended. Let's call this an End-Of-File control tuple. Once the last operator gets the End-Of-File tuple, it would, say, close the destination file it was writing and create a new file.
More use cases which were discussed during the requirements of this feature are as follows:
There is no restriction on which operator in the DAG can or can not generate a control tuple. The operator which needs to generate a control tuple should declare a port whose type is ControlAwareDefaultOutputPort
; the user could simply call the emitControl(ControlTuple t)
method on this port.
Example: In the code snippet below, the Generator
operator declares a ControlAwareDefaultOutputPort
called output
which can emit a data tuple as well as a control tuple.
public class Generator extends BaseOperator implements InputOperator { private long data; private long count; public final transient ControlAwareDefaultOutputPort<Double> output = new ControlAwareDefaultOutputPort<>(); @Override public void emitTuples() { // Can emit a data tuple using output.emit() output.emit(data++); count++; } @Override public void endWindow() { // Can also emit a control tuple using output.emitControl() output.emitControl(new TestControlTuple(count, immediate)); } }
Note - User defined control tuples and control aware ports can only be used in operators which use the apex-core dependency which has control tuple support, viz. 3.6.0 or above. Previous versions of apex-core would not be able to support an application which uses user defined control tuples or control aware ports and would crash at launch time.
Any downstream operator which wants to receive a user defined control tuple, should declare an input port which is Control Aware. A ControlAwareDefaultInputPort
would have the necessary capability to process a control tuple in addition to a regular data tuple.
Example: Below code snippet illustrates the use of processControl
method of ControlAwareDefaultInputPort
to receive / handle user defined control tuples.
public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>() { // Process a data tuple @Override public void process(Double tuple) { output.emit(tuple); } // Process a control tuple @Override public boolean processControl(ControlTuple userControlTuple) { // process control tuple here return false; // indicates whether or not the engine // should propagate the tuple automatically to downstream operators // Discussed in later sections } };
Note that the pre-defined control tuples like BEGIN_WINDOW
and END_WINDOW
would not be handled by the processControl()
method since these used only by the engine and are not meant to be delivered to user logic in operators. Custom control tuples on the other hand are generated by the operators and need to be delivered to downstream operators.
processControl
Following are the semantics:
See Propagation of Control Tuples for more details
A control tuple generated by some operator of the application needs to traverse the same path as that traversed by other data tuples transmitted by the application. For this reason, similar to the other data tuples, the control tuple needs to be Kryo serializable since the default serializer used by the platform is Kryo.
A control tuple emitted by an operator can be propagated downstream automatically. This is in line with the automatic propagation of other pre-defined control tuples in the engine. However, some use cases require that the control tuple need not be propagated further in the DAG. We support this behavior for user defined control tuples.
Once the control tuple is processed in the processControl
method, a return value is expected by the engine. This return value indicates whether or not the operator wishes to handle the propagation of the control tuple or let the engine proceed with the default auto-propagation of the control tuple.
The processControl
method of the ControlAwareDefaultInputPort
returns a boolean return value.
@Override public boolean processControl(ControlTuple userControlTuple) { // process userControlTuple here // return true if operator wants to propagate explicitly or block propagation // return false if operator wants engine to propagate automatically }
For operators without Control Aware ports, the platform will forward the control tuples to the downstream operators automatically. The application writer / user does not have to worry about handling a Control tuple which is generated upstream. Only operators with Control Aware ports would be delivered the control tuple via the processControl
method. This also allows the existing operators to be backward compatible.
Delivery mechanism refer to the time wrt. the processing window when a control tuple is delivered to the operator. An operator has various call backs like setup
, beginWindow
, endWindow
, etc.
As the name implies, the control tuple is immediately delivered to the next downstream operator (if the operator is control aware), else it is forwarded to the next downstream operator.
Case: Downstream is partitioned
When the downstream is partitioned, the control tuple with IMMEDIATE delivery type would go to all the downstream partitions. This holds, irrespective of whether or not the control tuple was generated by the immediately upstream operator or even further upstream.
Case: Upstream is partitioned
When the upstream is partitioned and the control tuple is generated in any subset of the partitions the downstream operator would receive the control tuple immediately and would not wait till the end of the current window. In case the source for the control tuple was a single source further upstream and multiple copies were generated by the intermediate partitions, the duplicate copies of the control tuple would be filtered out at the downstream operator. Thus only unique control tuples are delivered to the downstream operator. Further, in case of IMMEDIATE delivery, the first instance of the control tuple is delivered to the operator and the duplicates filtered out.
This delivery type only delivers the control tuple to the operator after all data tuples have been delivered to the operator. In the operator lifecycle, this would mean that the control tuples would be delivered just before the endWindow
call.
Case: Downstream is partitioned
When the downstream is partitioned, the control tuple emitted by the upstream would be broadcast to downstream operators and buffered in the downstream partitions until the end of the window and is delivered to the operator just before the endWindow
call.
Case: Upstream is partitioned
If the control tuples are generated in any subset of the partitions, then each control tuple is unique and are delivered to the downstream operator before the endWindow
call. However, if the source for the control tuple is a source further upstream, then the downstream operator would filter out duplicates as and when each control tuple arrive at the operator, and finally all unique control tuples are delivered to the operator just before the endWindow
call.
All the user defined control tuples used in the application are cached in the memory of the operator for the duration of a window. For this reason, it is imperative that the size as well as the number of control tuples emitted within a window is small as compared to the number of data tuples.