The XmlParser operator parses XML records and constructs POJOs (“Plain Old Java Objects”) from them. The operator also emits each record as a DOM Document if the relevant output port is connected. User can also provide a XSD (XML Schema Definition) to validate incoming XML records. Valid records will be emitted as POJOs / DOM Document while invalid ones are emitted on error port with an error message if the error port is connected.
XmlParser is idempotent, fault-tolerant and statically/dynamically partitionable.
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
schemaXSDFile | [XSD] describing XML data. Incoming records can be validated using the schemaXSDFile. If the data is not as per the requirements specified in schemaXSDFile, they are emitted on the error port. This is an optional property. If the XSD is not provided, incoming tuples are simply converted to POJOs or DOM Documents without any validations | String | No | N/A |
Attribute | Description | Type | Mandatory |
---|---|---|---|
out.TUPLE_CLASS | TUPLE_CLASS attribute on output port which tells operator the class of POJO which needs to be emitted. The name of the field members of the class must match with the names in incoming POJO. The operator ignores unknown properties i.e. fields present in POJO but not in TUPLE_CLASS or vice versa. | Class or FQCN | Yes |
Port | Description | Type | Mandatory |
---|---|---|---|
in | Tuples that needs to be parsed are received on this port | byte[] | Yes |
out | Valid Tuples that are emitted as pojo. Tuples are converted to POJO only if the port is connected. | Object (POJO) | No |
parsedOutput | Valid Tuples that are emitted as DOM Document. Tuples are converted to DOM Document only if the port is connected. | DOM Document | No |
err | Invalid Tuples are emitted with error message. Invalid tuples are discarded if the port is not connected. | KeyValPair <String, String> | No |
XML Parser is both statically and dynamically partitionable.
This can be achieved in 2 ways
XmlParser xmlParser = dag.addOperator("xmlParser", XmlParser.class); StatelessPartitioner<XmlParser> partitioner1 = new StatelessPartitioner<XmlParser>(2); dag.setAttribute(xmlParser, Context.OperatorContext.PARTITIONER, partitioner1 );
<property> <name>dt.operator.{OperatorName}.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value> </property>
where {OperatorName} is the name of the XmlParser operator. Above lines will partition XmlParser statically 2 times. Above value can be changed accordingly to change the number of static partitions.
XmlParser can be dynamically partitioned using an out-of-the-box partitioner:
Following code can be added to ‘populateDAG’ method of application to dynamically partition XmlParser:
XmlParser xmlParser = dag.addOperator("xmlParser", XmlParser.class); StatelessThroughputBasedPartitioner<XmlParser> partitioner = new StatelessThroughputBasedPartitioner<>(); partitioner.setCooldownMillis(conf.getLong("dt.cooldown", 10000)); partitioner.setMaximumEvents(conf.getLong("dt.maxThroughput", 30000)); partitioner.setMinimumEvents(conf.getLong("dt.minThroughput", 10000)); dag.setAttribute(xmlParser, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); dag.setAttribute(xmlParser, OperatorContext.PARTITIONER, partitioner);
Above code will dynamically partition XmlParser when the throughput changes. If the overall throughput of XmlParser goes beyond 30000 or less than 10000, the platform will repartition XmlParser to balance throughput of a single partition to be between 10000 and 30000. ‘dt.cooldown’ of 10000 will be used as the threshold time for which the throughput change is observed.
Example for Xml Parser can be found at: https://github.com/DataTorrent/examples/tree/master/tutorials/parser