This operator receives a POJO (Plain Old Java Object) as an incoming tuple, converts the data in the incoming POJO to a custom delimited string and emits the delimited string.
CsvFormatter supports schema definition as a JSON string.
CsvFormatter does not hold any state and is idempotent, fault-tolerant and statically/dynamically partitionable.
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
schema | Contents of the schema.Schema is specified in a json format. | String | Yes | N/A |
Attribute | Description | Type | Mandatory |
---|---|---|---|
in.TUPLE_CLASS | TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming | Class or FQCN | Yes |
Port | Description | Type | Mandatory |
---|---|---|---|
in | Tuples which need to be formatted are received on this port | Object (POJO) | Yes |
out | Tuples that are formatted are emitted from this port | String | No |
err | Tuples that could not be converted are emitted on this port | Object | No |
Current CsvFormatter contain following limitations:
Example for CsvFormatter can be found at: https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter
CsvFormatter expects schema to be a String in JSON format:
Example for format of schema:
{ "separator": ",", "quoteChar": "\"", "lineDelimiter": "\n", "fields": [ { "name": "campaignId", "type": "Integer" }, { "name": "startDate", "type": "Date", "constraints": { "format": "yyyy-MM-dd" } } ] }
Being stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:
Stateless partitioning will ensure that CsvFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG. CsvFormatter can be stateless partitioned by adding following lines to properties.xml:
<property> <name>dt.operator.{OperatorName}.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value> </property>
where {OperatorName} is the name of the CsvFormatter operator. Above lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.
Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions. CsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:
Following code can be added to populateDAG method of application to dynamically partition CsvFormatter:
StatelessThroughputBasedPartitioner<CsvFormatter> partitioner = new StatelessThroughputBasedPartitioner<>(); partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);
Above code will dynamically partition CsvFormatter when throughput changes. If overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter to balance throughput of a single partition to be between 10000 and 30000. CooldownMillis of 10000 will be used as threshold time for which throughput change is observed.