Transform means mapping of field expression from input to output or conversion of fields from one type to another. This operator is stateless. This operator receives objects on its input port; for each such input object, it creates a new output object whose fields are computed as expressions involving fields of the input object. The types of the input and output objects are configurable as are the expressions used to compute the output fields.
The operator class is TransformOperator
located in the package com.datatorrent.lib.transform
. Please refer to github URL for TransformOperator
.
Consider the data that needs to be transformed as per output schema.
Consider input objects with these fields:
Name | Type |
---|---|
FirstName | String |
LastName | String |
Phone | String |
DateOfBirth | java.util.Date |
Address | String |
and output objects with fields:
Name | Type |
---|---|
Name | String |
Phone | String |
Age | Integer |
Address | String |
Suppose Name
is a concatenation of FirstName
and LastName
and Age
is computed by subtracting the DateOfBirth
from the current year.
These simple computations can be expressed as Java expressions where the input object is represented by $ and provided as configuration parameters as follows:
Name => {$.FirstName}.concat(\" \").concat({$.LastName}) Age => (new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()
expressionMap - Map<String, String>
expressionFunctions - List
copyMatchingFields - boolean
expressionMap
then it ignores copy to output object.Consider input object with fields:
Name | Type |
---|---|
FirstName | String |
LastName | String |
StartDate | org.joda.time.DateTime |
and output objects with fields:
Name | Type |
---|---|
Name | String |
isLeapYear | Boolean |
Note: org.joda.time.DateTime
class is not present in the default list. So, we need to add this library to expressionFunctions
as below in populateDAG method:
TransformOperator operator = dag.addOperator("transform", new TransformOperator()); operator.setExpressionFunctions(Arrays.asList("org.joda.time.DateTime", org.apache.commons.lang3.StringUtils)); Map<String,String> expressionMap = new HashMap<>(); expressionMap.put(isLeapYear, {$.StartDate}.year().isLeap()); expressionMap.put(Name, org.apache.commons.lang3.StringUtils.joinWith(\" \", {$.FirstName},{$.LastName}); operator.setExpressionMap(expressionMap);
Above Properties also can be set in properties file as follows:
<property> <name>dt.operator.transform.expressionFunctions[0]</name> <value>org.joda.time.DateTime</value> </property> <property> <name>dt.operator.transform.expressionFunctions[1]</name> <value>org.apache.commons.lang3.StringUtils</value> </property> <property> <name>dt.operator.transform.expressionMap(isLeapYear)</name> <value>{$.StartDate}.year().isLeap()</value> </property> <property> <name>dt.operator.transform.expressionMap(Name)</name> <value>org.apache.commons.lang3.StringUtils.joinWith(\" \", {$.FirstName}, {$.LastName})</value> </property>
input - Port for input tuples.
output - Port for transformed output tuples.
Input port Attribute - input.TUPLE_CLASS - Fully qualified class name and class should be Kryo serializable.
Output port Attribute - output.TUPLE_CLASS - Fully qualified class name and class should be Kryo serializable.
Please refer Example for transform sample application.
Being stateless, this operator can be partitioned using any of the built-in partitioners present in the Malhar library by setting a few properties as follows:
Stateless partitioning will ensure that TransformOperator will be partitioned right at the starting of the application and will remain partitioned throughout the lifetime of the DAG. TransformOperator can be stateless partitioned by adding following lines to properties.xml:
<property> <name>dt.operator.{OperatorName}.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:{N}/value> </property>
where {OperatorName} is the name of the TransformOperator operator and {N} is the number of static partitions. Above lines will partition TransformOperator statically {N} times.
Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain condition. TransformOperator can be dynamically partitioned using the below two partitioners:
Following code can be added to populateDAG(DAG dag, Configuration conf) method of application to dynamically partitioning TransformOperator:
StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new StatelessThroughputBasedPartitioner<>(); partitioner.setCooldownMillis(10000); partitioner.setMaximumEvents(30000); partitioner.setMinimumEvents(10000); dag.setAttribute(transform, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); dag.setAttribute(transform, OperatorContext.PARTITIONER, partitioner);
Above code will dynamically partition TransformOperator when the throughput changes. If the overall throughput of TransformOperator goes beyond 30000 or less than 10000, the platform will repartition TransformOperator to balance throughput of a single partition to be between 10000 and 30000. CooldownMillis of 10000 will be used as the threshold time for which the throughout change is observed.
Source code for this dynamic application can be found here.