Purpose of JsonParser operator is to parse JSON records and construct a Plain Old Java Object (“POJO”) out of it. The operator also emits each record as JSONObject if the relevant output port is connected. User can also provide a schema describing JSON data to validate incoming JSON records. Valid records will be emitted as POJO / JSONObject while invalid ones are emitted on error port with the error message if the error port is connected.
Json Parser is idempotent, fault-tolerant & statically/dynamically partitionable.
Property | Description | Type | Mandatory | Default Value |
---|---|---|---|---|
jsonSchema | Schema describing JSON data. Incoming records can be validated using the jsonSchema. If the data is not as per the requirements specified in jsonSchema, they are emitted on the error port.This is an optional property. If the schema is not provided, incoming tuples are simply converted to POJO or JSONObject without any validations | String | NO | N/A |
Attribute | Description | Type | Mandatory |
---|---|---|---|
out.TUPLE_CLASS | TUPLE_CLASS attribute on output port which tells operator the class of POJO which needs to be emitted. The name of the field members of the class must match with the names in incoming POJO. The operator ignores unknown properties. | Class or FQCN | Yes |
Port | Description | Type | Mandatory |
---|---|---|---|
in | Tuples that needs to be parsed are recieved on this port | byte[] | Yes |
out | Valid Tuples that are emitted as pojo. Tuples are converted to POJO only if the port is connected. | Object (POJO) | No |
parsedOutput | Valid Tuples that are emitted as JSONObject. Tuples are converted to JSONObject only if the port is connected. | JSONObject | No |
err | Invalid Tuples are emitted with error message. Invaid tuples are discarded if the port is not connected. | KeyValPair <String, String> | No |
JSON Parser is both statically and dynamically partitionable.
This can be achieved in 2 ways
JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class); StatelessPartitioner<JsonParser> partitioner1 = new StatelessPartitioner<JsonParser>(2); dag.setAttribute(jsonParser, Context.OperatorContext.PARTITIONER, partitioner1 );
<property> <name>dt.operator.{OperatorName}.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value> </property>
where {OperatorName} is the name of the JsonParser operator. Above lines will partition JsonParser statically 2 times. Above value can be changed accordingly to change the number of static partitions.
JsonParser can be dynamically partitioned using an out-of-the-box partitioner:
Following code can be added to populateDAG method of application to dynamically partition JsonParser:
JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class); StatelessThroughputBasedPartitioner<JsonParser> partitioner = new StatelessThroughputBasedPartitioner<>(); partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); dag.setAttribute(jsonParser, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); dag.setAttribute(jsonParser, OperatorContext.PARTITIONER, partitioner);
Above code will dynamically partition JsonParser when the throughput changes. If the overall throughput of JsonParser goes beyond 30000 or less than 10000, the platform will repartition JsonParser to balance throughput of a single partition to be between 10000 and 30000. CooldownMillis of 10000 will be used as the threshold time for which the throughput change is observed.
Example for Json Parser can be found at: https://github.com/DataTorrent/examples/tree/master/tutorials/parser