blob: d1b1efa3455a77cea45ad9c9d43ca59b7be71f9d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.parser;
import java.io.IOException;
import javax.validation.ConstraintViolationException;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.parser.JsonParserTest.Product;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.io.ConsoleOutputOperator;
public class JsonParserApplicationTest
{
@Test
public void testApplication() throws IOException, Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
lma.prepareDAG(new JsonParserTest(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000);// runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
public static class JsonParserTest implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
JsonDataEmitterOperator input = dag.addOperator("data", new JsonDataEmitterOperator());
JsonParser parser = dag.addOperator("jsonparser", new JsonParser());
parser.setClazz(Product.class);
dag.getMeta(parser).getMeta(parser.out).getAttributes().put(Context.PortContext.TUPLE_CLASS, Product.class);
parser.setJsonSchema(SchemaUtils.jarResourceFileToString("json-parser-schema.json"));
ConsoleOutputOperator jsonObjectOp = dag.addOperator("jsonObjectOp", new ConsoleOutputOperator());
ConsoleOutputOperator pojoOp = dag.addOperator("pojoOp", new ConsoleOutputOperator());
jsonObjectOp.setDebug(true);
dag.addStream("input", input.output, parser.in);
dag.addStream("output", parser.parsedOutput, jsonObjectOp.input);
dag.addStream("pojo", parser.out, pojoOp.input);
}
}
public static class JsonDataEmitterOperator extends BaseOperator implements InputOperator
{
public static String jsonSample = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<byte[]>();
@Override
public void emitTuples()
{
output.emit(jsonSample.getBytes());
}
}
}